0

I have two dataframes,

val df1 = sqlContext.csvFile("/data/testData.csv")
val df2 = sqlContext.csvFile("/data/someValues.csv")


 df1=
 startTime  name    cause1  cause2
 15679       CCY    5         7
 15683              2         5
 15685              1         9
 15690              9         6

df2=
cause   description causeType
3       Xxxxx       cause1
1       xxxxx       cause1
3       xxxxx       cause2
4       xxxxx
2       Xxxxx

and I want to apply a complex function getTimeCust to both cause1 and cause2 to determine a final cause, then match the description of this final cause code in df2. I must have a new df (or rdd) with the following columns:

startTime   name    cause   descriptionCause

My solution was

  val rdd2 = df1.map(row => {
  val (cause, descriptionCause) = getTimeCust(row.getInt(2), row.getInt(3), df2)
  Row (row(0),row(1),cause,descriptionCause)
  })

If a run the code below I have a NullPointerException because the df2 is not visible.

The function getTimeCust(Int, Int, DataFrame) works well outside the map.

Bentech
  • 468
  • 5
  • 14
  • 1
    You can't. Do a simple search and you will find lots of answers which explain it but basically you can't use dataframe (or RDD) inside something which runs on executors such as map, UDF etc. IF you explain what getTimeCust does it might be possible to suggest an alternative (such as join) – Assaf Mendelson Jan 05 '17 at 11:22
  • I have edited my question. – Bentech Jan 05 '17 at 12:12
  • what exactly are you trying to achieve? what data do you need from df2? how do you use it? maybe add the code for gteTimeCust – Assaf Mendelson Jan 05 '17 at 12:20
  • question edited – Bentech Jan 05 '17 at 15:47
  • Its not a good idea to use an rdd inside a map of other rdd (and i think it's not posible). It looks like you are tring to calculate the cause, using cause1 and cause2 and take the descriptionCause in the same step. If this is the idea, try to make two separated steps, one calculating the cause without using the df2, and then aggregate the description with a join. – Alfilercio Jan 05 '17 at 16:09
  • How big are your DataFrames? If they are fairly small you can just use a hashmap instead. – puhlen Jan 10 '17 at 14:11

3 Answers3

2

Use df1.join(df2, <join condition>) to join your dataframes together then select the fields you need from the joined dataframe.

You can't use spark's distributed structures (rdd, dataframe, etc) in code that runs on an executor (like inside a map).

puhlen
  • 8,400
  • 1
  • 16
  • 31
  • This solution is not good, because the is a result of an operation as I said in my edited question. – Bentech Jan 10 '17 at 10:17
  • That shouldn't stop you from doing this. You can use the result of a function as a key, or calculate the key in a previous step like in Assaf's answer. – puhlen Jan 10 '17 at 14:11
0

Try something like this:

def f1(cause1: Int, cause2: Int): Int = some logic to calculate cause

import org.apache.spark.sql.functions.udf
val dfCause = df1.withColumn("df1_cause", udf(f1)($"cause1", $"cause2"))
val dfJoined = dfCause.join(df2, on= df1Cause("df1_cause")===df2("cause"))
dfJoined.select("cause", "description").show()
Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • I have the following error. ` error: missing arguments for method f1; follow this method with `_' if you want to treat it as a partially applied function }} ` In affect, the function f1 returns 2 values the final cause and the name which is mandotory for the join. – Bentech Jan 10 '17 at 10:44
  • You don't need f1 to return the name, you already have it in the first column ("$name") so f1 should return just the calculated cause. The result of the join should contain all columns of both dataframes (and actually "cause" would appear twice, once as "cause" and once as "df1_cause"). As for the error message, you might need to do udf(f1 _) to tell it you are sending a function as opposed to applying it. see http://stackoverflow.com/questions/6650989/when-do-i-have-to-treat-my-methods-as-partially-applied-functions-in-scala – Assaf Mendelson Jan 10 '17 at 14:26
  • Thank you for the response. I don't want really the name of the row but the **name** of the type. I've edited my question by adding a column **causeType**. Then join condition will be on **cause** and **causeType** – Bentech Jan 10 '17 at 15:00
0

Thank you @Assaf. Thanks to your answer and the spark udf with data frame. I have resolved the this problem. The solution is:

   val getTimeCust= udf((cause1: Any, cause2: Any) => {
   var lastCause = 0
   var categoryCause=""
   var descCause=""
   lastCause= .............
   categoryCause= ........

    (lastCause, categoryCause)
  })

and after call the udf as:

  val dfWithCause = df1.withColumn("df1_cause", getTimeCust( $"cause1", $"cause2"))

ANd finally the join

 val dfFinale=dfWithCause.join(df2, dfWithCause.col("df1_cause._1") === df2.col("cause") and dfWithCause.col("df1_cause._2") === df2.col("causeType"),'outer' )
Community
  • 1
  • 1
Bentech
  • 468
  • 5
  • 14