I am currently doing streaming project using Apache Spark. I have 2 data source, the first one I get news data from Kafka. This data is always updating every time. And the second one, I get masterWord dictionary. This variable contains dataframe of words and the unique key of words.
I want to process news data, then convert it from Seq of words become Seq of words_id by matching the data to masterWord dictionary. But, I have problems when accessing the masterWord dataframe in my UDF. When I am trying to access dataframe inside UDF, Spark return this error
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 i n stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException
Why dataframe cannot be accessed inside UDF ?
What is the best practice to get value from another dataframe ?
This is my code
// read data stream from Kafka
val kafka = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", PropertiesLoader.kafkaBrokerUrl)
.option("subscribe", PropertiesLoader.kafkaTopic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "100")
.load()
// Transform data stream to Dataframe
val kafkaDF = kafka.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", ColsArtifact.rawSchema).as("data"))
.select("data.*")
.withColumn("raw_text", concat(col("title"), lit(" "), col("text"))) // add column aggregate title and text
// read master word dictionary
val readConfig = ReadConfig(Map("uri" -> "mongodb://10.252.37.112/prayuga", "database" -> "prayuga", "collection" -> "master_word_2"))
var masterWord = MongoSpark.load(spark, readConfig)
// call UDF
val aggregateDF = kafkaDF.withColumn("text_aggregate", aggregateMongo(col("text_selected")))
// UDF
val aggregateMongo = udf((content: Seq[String]) => {
masterWord.show()
...
// code for query masterWord whether var content exist or not in masterWord dictionary
})