2

I want to pass a variable and not a column to a UDF in spark.

The map is of the following format Spark dataframe to nested map

val joinUDF = udf((replacementLookup: Map[String, Double], newValue: String) => {
    replacementLookup.get(newValue) match {
      case Some(tt) => tt
      case None => 0.0
    }
  })

Should be mapped like

(columnsMap).foldLeft(df) {
    (currentDF, colName) =>
      {
        println(colName._1)
        println(colName._2)
        currentDF
          .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
      }
  }

But throws

type mismatch;
[error]  found   : Map
[error]  required: org.apache.spark.sql.Column
[error]           .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
Community
  • 1
  • 1
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292

2 Answers2

7

You can use currying:

import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")

def joinUDF(replacementLookup: Map[String, Double]) = udf((newValue: String) => {
  replacementLookup.get(newValue) match {
    case Some(tt) => tt
    case None => 0.0
  }
})

val myMap = Map("a" -> 1.5, "b" -> 3.0)

df.select(joinUDF(myMap)($"StringColumn")).show()

Also, you can try with a broadcast variable:

import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")

val myMap = Map("a" -> 1.5, "b" -> 3.0)
val broadcastedMap = sc.broadcast(myMap)

def joinUDF = udf((newValue: String) => {
  broadcastedMap.value.get(newValue) match {
    case Some(tt) => tt
    case None => 0.0
  }
})

df.select(joinUDF($"StringColumn")).show()
Daniel de Paula
  • 17,362
  • 9
  • 71
  • 72
4

If you want to pass literals to an UDF, use org.apache.spark.sql.functions.lit

i.e. use joinUDF(lit(colName._2), col(colName._1))

But maps aren't supported, so you have to rewrite your code, e.g. by applying the Map-argument before creating the udf

val joinFunction = (replacementLookup: Map[String, Double], newValue: String) => {
   replacementLookup.get(newValue) match {
     case Some(tt) => tt
     case None => 0.0
  }
}

 (columnsMap).foldLeft(df) {
   (currentDF, colName) =>
   {
     val joinUDF = udf(joinFunction(colName._2, _:String))
     currentDF
       .withColumn("myColumn_" + colName._1, joinUDF(col(colName._1)))
   }
 }
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145