1

I am using spark-sql-2.4.3v with java. I have scenario below

val data = List(
  ("20", "score", "school",  14 ,12),
  ("21", "score", "school",  13 , 13),
  ("22", "rate", "school",  11 ,14),
  ("23", "score", "school",  11 ,14),
  ("24", "rate", "school",  12 ,12),
  ("25", "score", "school", 11 ,14)
 )
val df = data.toDF("id", "code", "entity", "value1","value2")
df.show

//this look up data populated from DB.

val ll = List(
   ("aaaa", 11),
  ("aaa", 12),
  ("aa", 13),
  ("a", 14)
 )
val codeValudeDf = ll.toDF( "code", "value")
codeValudeDf.show

I need to map "code" with "value" in the final output, only for those rows/records which has "code" as "score" in the "data" dataframe.

How can i make a look up hashmap from codeValudeDf , so that I can get output as below

+---+-----+-------+------+-----+
| id| code|entity|value1|value2|
+---+-----+-------+------+-----+
| 20|score|school|     a|   aaa|
| 21|score|school|    aa|    aa|
| 22| rate|school|    11|    14|
| 23|score|school|  aaaa|     a|
| 24| rate|school|    12|    12|
| 25|score|school|  aaaa|     a|
+---+-----+------+------+------+

Is there any possibility to make this look up optimum i.e. every time i should not pull the dataframe data from DB ??

BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • Does this answer your question? [Lookup in Spark dataframes](https://stackoverflow.com/questions/41275539/lookup-in-spark-dataframes) – Srinivas Jul 16 '20 at 12:12

2 Answers2

2

If lookup data is of small size then you can create Map and broadcast it. broadcasted map can be easily used in udf as below-

Load the test data provided

 val data = List(
      ("20", "score", "school",  14 ,12),
      ("21", "score", "school",  13 , 13),
      ("22", "rate", "school",  11 ,14),
      ("23", "score", "school",  11 ,14),
      ("24", "rate", "school",  12 ,12),
      ("25", "score", "school", 11 ,14)
    )
    val df = data.toDF("id", "code", "entity", "value1","value2")
    df.show
    /**
      * +---+-----+------+------+------+
      * | id| code|entity|value1|value2|
      * +---+-----+------+------+------+
      * | 20|score|school|    14|    12|
      * | 21|score|school|    13|    13|
      * | 22| rate|school|    11|    14|
      * | 23|score|school|    11|    14|
      * | 24| rate|school|    12|    12|
      * | 25|score|school|    11|    14|
      * +---+-----+------+------+------+
      */

    //this look up data populated from DB.

    val ll = List(
      ("aaaa", 11),
      ("aaa", 12),
      ("aa", 13),
      ("a", 14)
    )
    val codeValudeDf = ll.toDF( "code", "value")
    codeValudeDf.show
    /**
      * +----+-----+
      * |code|value|
      * +----+-----+
      * |aaaa|   11|
      * | aaa|   12|
      * |  aa|   13|
      * |   a|   14|
      * +----+-----+
      */

broadcasted map can be easily used in udf as below-


    val lookUp = spark.sparkContext
      .broadcast(codeValudeDf.map{case Row(code: String, value: Integer) => value -> code}
      .collect().toMap)

    val look_up = udf((value: Integer) => lookUp.value.get(value))

    df.withColumn("value1",
      when($"code" === "score", look_up($"value1")).otherwise($"value1".cast("string")))
      .withColumn("value2",
        when($"code" === "score", look_up($"value2")).otherwise($"value2".cast("string")))
      .show(false)
    /**
      * +---+-----+------+------+------+
      * |id |code |entity|value1|value2|
      * +---+-----+------+------+------+
      * |20 |score|school|a     |aaa   |
      * |21 |score|school|aa    |aa    |
      * |22 |rate |school|11    |14    |
      * |23 |score|school|aaaa  |a     |
      * |24 |rate |school|12    |12    |
      * |25 |score|school|aaaa  |a     |
      * +---+-----+------+------+------+
      */


Som
  • 6,193
  • 1
  • 11
  • 22
  • could you please tell what this exactly doing here .map{case Row(code: String, value: Integer) => value -> code} – BdEngineer Jul 16 '20 at 16:28
  • 1
    map on `Dataset[Row]` to convert it into `Dataset[(Integer, String)]` and then `collect` to `Array[(Integer, String)]` then `toMap` to `Map[nteger, String]` – Som Jul 16 '20 at 17:09
  • thank you ...when I tried the solution it is giving error org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.DataFrameReader – BdEngineer Jul 16 '20 at 17:59
  • Try making the class serializable in which you have spark.broadcast – Som Jul 16 '20 at 18:23
  • I am not using any class now , i am executing it in my Zepplin notebook and encountered this error – BdEngineer Jul 17 '20 at 04:37
  • 1
    I don't have zepplin env to support you. can you try using test case? – Som Jul 17 '20 at 05:20
  • any suggestion on this https://stackoverflow.com/questions/63074569/copy-current-row-modify-it-and-add-a-new-row-in-spark – BdEngineer Jul 24 '20 at 13:31
  • I've already answered similar query here- https://stackoverflow.com/a/63057478/4758823 – Som Jul 24 '20 at 18:34
  • hi ,i have a use case like this , any advice please https://stackoverflow.com/questions/63137437/doing-multiple-column-value-look-up-after-joining-with-lookup-dataset – BdEngineer Jul 28 '20 at 15:19
-2

Using the broadcasted map indeed looks a wise decision as you do not need to hit your database to pull the lookup data every time.

Here I have solved the problem using a key-value map in a UDF. I am unable to compare its performance w.r.t. broadcasted map approach, but would welcome inputs from spark experts to opine.

Step# 1: Building KeyValueMap -

val data = List(
  ("20", "score", "school",  14 ,12),
  ("21", "score", "school",  13 , 13),
  ("22", "rate", "school",  11 ,14),
  ("23", "score", "school",  11 ,14),
  ("24", "rate", "school",  12 ,12),
  ("25", "score", "school", 11 ,14)
 )
val df = data.toDF("id", "code", "entity", "value1","value2")

val ll = List(
   ("aaaa", 11),
  ("aaa", 12),
  ("aa", 13),
  ("a", 14)
 )
val codeValudeDf = ll.toDF( "code", "value")


val Keys = codeValudeDf.select("value").collect().map(_(0).toString).toList

val Values = codeValudeDf.select("code").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap

Step# 2: Creating UDF

def CodeToValue(code: String, key: String): String = { 
if (key == null) return ""
if (code != "score") return key
val result: String = KeyValueMap.getOrElse(key,"not found!") 
return result }

val CodeToValueUDF = udf (CodeToValue(_:String, _:String):String )

Step# 3: Adding derived columns using UDF in original dataframe

val newdf  = df.withColumn("Col1", CodeToValueUDF(col("code"), col("value1")))

val finaldf = newdf.withColumn("Col2", CodeToValueUDF(col("code"), col("value2")))
    
finaldf.show(false)

+---+-----+------+------+------+----+----+
| id| code|entity|value1|value2|Col1|Col2|
+---+-----+------+------+------+----+----+
| 20|score|school|    14|    12|   a| aaa|
| 21|score|school|    13|    13|  aa|  aa|
| 22| rate|school|    11|    14|  11|  14|
| 23|score|school|    11|    14|aaaa|   a|
| 24| rate|school|    12|    12|  12|  12|
| 25|score|school|    11|    14|aaaa|   a|
+---+-----+------+------+------+----+----+
Shantanu Kher
  • 1,014
  • 1
  • 8
  • 14
  • You shouldn't use direct variables inside udf though it can be possible. Spark needs to pass copy of the local variable to each task which is inefficient for the big data where there are plenty of tasks getting executed either on the one executor or another. To make it more performant you should always broadcast it- more info http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables – Som Jul 17 '20 at 01:06
  • Thank you @Someshwar for the insights. I have recently started learning spark and still in learning phase. I agree with you. – Shantanu Kher Jul 17 '20 at 01:16
  • @Shantanu Kher can you tell me what is wrong with this broadcast variable accessing ? https://stackoverflow.com/questions/64003697/spark-broadcast-variable-map-giving-null-value – BdEngineer Sep 22 '20 at 05:48