1

I am currently doing streaming project using Apache Spark. I have 2 data source, the first one I get news data from Kafka. This data is always updating every time. And the second one, I get masterWord dictionary. This variable contains dataframe of words and the unique key of words.

I want to process news data, then convert it from Seq of words become Seq of words_id by matching the data to masterWord dictionary. But, I have problems when accessing the masterWord dataframe in my UDF. When I am trying to access dataframe inside UDF, Spark return this error

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 i n stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException

Why dataframe cannot be accessed inside UDF ?

What is the best practice to get value from another dataframe ?

This is my code

// read data stream from Kafka
val kafka = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", PropertiesLoader.kafkaBrokerUrl)
  .option("subscribe", PropertiesLoader.kafkaTopic)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", "100")
  .load()

// Transform data stream to Dataframe
val kafkaDF = kafka.selectExpr("CAST(value AS STRING)").as[(String)]
  .select(from_json($"value", ColsArtifact.rawSchema).as("data"))
  .select("data.*")
  .withColumn("raw_text", concat(col("title"), lit(" "), col("text"))) // add column aggregate title and text

// read master word dictionary
val readConfig = ReadConfig(Map("uri" -> "mongodb://10.252.37.112/prayuga", "database" -> "prayuga", "collection" -> "master_word_2"))
var masterWord = MongoSpark.load(spark, readConfig)

// call UDF
val aggregateDF = kafkaDF.withColumn("text_aggregate", aggregateMongo(col("text_selected")))

// UDF
val aggregateMongo = udf((content: Seq[String]) => {
  masterWord.show()
  ...
  // code for query masterWord whether var content exist or not in masterWord dictionary
})
  • The dataframe will be broadcasted, But the broadcast value size, by default, can't be more than 10MB. First make sure the dataframe is not null on executor side, then check the size. – deo Mar 10 '19 at 08:16
  • @deo I run this program in local, I assume that If I show dataframe such as my code, I am running it in executor right ? I have showed it after I load, there is not null. But when I accessed in UDF there is null. – Muhammad Alfian Mar 10 '19 at 14:31
  • @user10465355 I am sorry, but I am not creating dataframe, but load data from data sources, is there any reference to make my dataframe can be load in UDF ? – Muhammad Alfian Mar 10 '19 at 14:34

2 Answers2

0

The dataframe lives in the spark context and it only available as such inside the driver Each of the tasks sees the fraction (partition) of the data and can work with that. if you want to make the data in the dataframe available inside a udf you have to serialize it to the master and then you can broadcast it (or pass it as parameter, which will essentially do the same) to the udf, in which case Spark will send the whole thing to each instance of the udf running

Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68
  • Thanks for your answer. I am using spark-structured streaming. is there any example code that I can try ? I little bit confuse for broadcast this variable or pass it as parameter in UDF. – Muhammad Alfian Mar 10 '19 at 14:35
  • actually it's possible to use dataframes in UDF, but only if you create a broadcast-variable out of it (which has type `Broadcast[DataFrame]`). – Raphael Roth Mar 10 '19 at 20:03
  • @raphael broadcasting it serilazes the dataframe to the driver and then sends it as I wrote above – Arnon Rotem-Gal-Oz Mar 10 '19 at 21:00
0

If you want to use DataFrames inside UDFs, you must create a Broadcast :

import spark.implicits._

val df_name =Seq("Raphael").toDF("name")

val bc_df_name: Broadcast[DataFrame] = spark.sparkContext.broadcast(df_name)

// use df_name inside udf
val udf_doSomething = udf(() => bc_df_name.value.as[String].first())

Seq(1,2,3)
  .toDF("i")
  .withColumn("test",udf_doSomething())
  .show()

gives

+---+-------+
|  i|   test|
+---+-------+
|  1|Raphael|
|  2|Raphael|
|  3|Raphael|
+---+-------+

This at least works in local mode, nut sure whether this also works on clusters. Anyway I would not recommend this approach, better convert (collect) the content of the dataframe in a scala datastructure on the driver (e.g. a Map) und broadcast this variable, or use a join instead.

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145