2

I have a Spark (version 1.6) Dataframe, and I would like to add a column with a value contained in a Scala Map, this is my simplified code:

val map = Map("VAL1" -> 1, "VAL2" -> 2)
val df2 = df.withColumn("newVal", map(col("key")))

This code doesn't work and obviously I receive the following error, because the map expecting a String value, while receiving a column:

found   : org.apache.spark.sql.Column
required: String

The only way I could do that is using an UDF:

val map = Map("VAL1" -> 1, "VAL2" -> 2)
val myUdf = udf{ value:String => map(value)}
val df2 = df.withColumn("newVal", myUdf($"key"))

I want avoid the use of UDFs if possible.

Are there any other solutions available using just the DataFrame API (I would like also to avoid transforming it to RDD)?

Community
  • 1
  • 1
Giorgio
  • 1,073
  • 3
  • 15
  • 33
  • 1
    You can work with `DataSet` or convert to RDD, get the map value, and convert to dataframe back again. – philantrovert May 18 '18 at 10:29
  • I think I can't because I'm using Spark 1.6 and DataSet were in a Beta version. Could you provide an example using just the API DataFrame if possible? I update my question – Giorgio May 18 '18 at 10:31

2 Answers2

3

TL;DR Just use udf.

With the version you use (Spark 1.6 according to your comment) there is no solution which doesn't require udf or map over RDD / Dataset.

In later versions you can:

  • use map functions (2.0 or later) to create literal MapType column

    import org.apache.spark.sql.functions
    
    val map = functions.map(
       Map("VAL1" -> 1, "VAL2" -> 2)
         .flatMap { case (k, v) =>  Seq(k, v) } .map(lit) .toSeq: _*
    )
    map($"key")
    
  • typedLit (2.2 or later) to create literal MapType column.

    val map = functions.typedLit(Map("VAL1" -> 1, "VAL2" -> 2))
    map($"key")
    

and use these directly.

Reference How to add a constant column in a Spark DataFrame?

2

You could convert the Map to a Dataframe and use a JOIN between this and your existing dataframe. Since the Map dataframe would be very small, it should be a Broadcast Join and avoid the need for a shuffle phase.

Letting Spark know to use a broadcast join is described in this answer: DataFrame join optimization - Broadcast Hash Join

mattinbits
  • 10,370
  • 1
  • 26
  • 35
  • Thanks @mattinbits, this did the trick. I refactored my code creating a new `DataFrame` and joining it with the existing one – Giorgio May 18 '18 at 15:13