1

LATER EDIT: It seems the problem is related to the Apache Zeppelin interpreter. I'm using Apache Zeppelin 0.6.0 on Spark 1.6.0. When running the same code in spark-shell (2.0.0) there were no issues.

This might be a bit too specific, but maybe it helps others that get similar errors with UDFs.

What I want is to create a column in a Spark Dataframe based on a different column in that DF and a Seq of strings. So, create column "urban" and put 1 if the value in column "location" is in the sequence "cities" else put 0.

Tried solving it in several different ways. I get the same error. The final version is based on these posts: Use of Seq.contains(String) and Create new column with udf. This is what I have now:

val cities = Seq("london", "paris")
df.filter(lower($"location") isin (cities : _*)).count()

Long = 5485947 So I have records with those 2 locations

import org.apache.spark.sql.functions._
val urbanFlag: (String => Int) = (arg: String) => {if (cities.contains(arg)) 1 else 0}
val urbf = udf(urbanFlag)
df.withColumn("urban", urbf(lower($"location"))).show(100)

When I run this I get "Job aborted due to stage failure", the error:

java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$$725d9ae18728ec9520b65ad133e3b55$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1

...and a huge stacktrace. I'd guess there is something about anonymous function but what?

Community
  • 1
  • 1
UrVal
  • 351
  • 6
  • 17
  • `$iwC$` is the classpath of code from the repl. I'd guess you can't use udfs from the repl. – Reactormonk Oct 04 '16 at 17:13
  • I think you are right. I moved the code to spark-shell and it works. I might not be using udfs correctly in Apache Zeppelin. – UrVal Oct 05 '16 at 09:05

1 Answers1

1

Maybe there's an issue with the way you're defining the UDF? This works for me:

import org.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'location' : 'london'}", "{'location': 'tokyo'}")))

val cities = Seq("london", "paris")
val urbf = udf { city: String => if (cities.contains(city)) 1 else 0 }

data.select($"location", urbf($"location")).show

+--------+-------------+
|location|UDF(location)|
+--------+-------------+
|  london|            1|
|   tokyo|            0|
+--------+-------------+

Note that I'm defining the UDF directly, i.e. without an intermediate.

mattsilver
  • 4,386
  • 5
  • 23
  • 37
  • It does not work on my side, same error. I also copied your code and ran it in a Zeppelin cell, same error. What version of Spark do you use? Mine is 1.6.0 as part of the CDH 5.7.0 distribution. – UrVal Oct 05 '16 at 07:53
  • Tried your code in spark-shell 2.0.0 and it worked. I also tried what I wrote and it worked too. It's a problem related to the Apache Zeppelin interpreter. – UrVal Oct 05 '16 at 09:06
  • 1
    @UrVal you should change your question to indicate it is Apache Zeppelin problem not a general Spark one – Arnon Rotem-Gal-Oz Oct 05 '16 at 09:11
  • The problem was somewhere else but I accepted your answer because it works and it's simpler than what I came up with. Learned something new so thanks. – UrVal Oct 05 '16 at 09:21
  • Can you please elaborate on where the problem was, exactly? I'm getting the same error without using UDF. – moshewe Jul 18 '18 at 08:47