42

I'm trying to figure out the new dataframe API in Spark. Seems like a good step forward but having trouble doing something that should be pretty simple. I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a function something like this:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

When I try to use it like this:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))

I get type mismatch errors

found   : org.apache.spark.sql.Column
required: Integer

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting errors with the function compiling because it wants a boolean in the if statement.

Am I doing this wrong? Is there a better/another way to do this than using withColumn?

Thanks for your help.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
J Calbreath
  • 2,665
  • 4
  • 22
  • 31

3 Answers3

64

Let's say you have "Amt" column in your Schema:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

I think withColumn is the right way to add a column

yjshen
  • 6,583
  • 3
  • 31
  • 40
  • 2
    I've never seen a fucntion written in scala the way you did above. By extension, If i had a more complex function with multiple arguments, would I write something like: val coder: ((Int, Int) => String) = (arg1: Int, arg2:Int) => {if (arg1 <100 && arg2 <100 .... ? – J Calbreath May 13 '15 at 17:35
  • 1
    @JCalbreath, its a function literal in scala, see this: http://stackoverflow.com/questions/5241147/what-is-a-function-literal-in-scala – yjshen May 13 '15 at 17:40
  • is it possible to add column which is Map[String,Any] so I can with custom udf parse json and save it into dataframe as scala's Map collection – fpopic Jul 21 '16 at 13:46
  • 2
    I think in myDF.withColumn("Code", sqlfunc(col("Amt"))) the column data needs to be specified with the dataframe i.e myDF.withColumn("Code", sqlfunc(myDF.col("Amt"))) – BJC Nov 26 '16 at 03:46
  • @YijieShen, could you please help on this issue https://stackoverflow.com/questions/55332897/how-to-add-new-column-to-datasetrow-using-map-function-on-the-dataset – Pyd Mar 25 '19 at 13:09
16

We should avoid defining udf functions as much as possible due to its overhead of serialization and deserialization of columns.

You can achieve the solution with simple when spark function as below

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
4

Another way of doing this: You can create any function but according to the above error, you should define function as a variable

Example:

val coder = udf((myAmt:Integer) => {
  if (myAmt > 100) "Little"
  else "Big"
})

Now this statement works perfectly:

myDF.withColumn("Code", coder(myDF("Amt")))
rollstuhlfahrer
  • 3,988
  • 9
  • 25
  • 38
imran
  • 111
  • 6