I have a streaming query that I'm reading from Kafka as the source. I want to perform some logic on each batch that I receive from the stream. Here's how I have done it so far
val streamDF = spark
.readStream
...
.load()
//val bc = spark.sparkContext.broadcast(spark)
streamDF
.writeStream
.foreach( new ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = {true}
def process(record: String) = {
val aRDD = spark.sparkContext.parallelize(Seq('a','b','C'))
val aDF = spark.createDataframe(aRDD)
//val aDF = bc.vlaue.createDataframe(aRDD)
// do something with aDF
}
def close(errorOrNull: Throwable): Unit = {}
}
).start()
I'm using Spark 2.3.2 so I'm stuck with ForeachWriter (I cannot use foreachBatch, this would've made my life simpler). I'm also aware that the foreach() performs on executors. So, keeping that in mind, I broadcasted sparkSession to all the executors. But that did not help either. This is the commented part of the code snippet.
I'm looking for a solution to process data as dataframe inside foreach in Spark 2.3.2 (I have to use dataframe/datasets as the operations are pretty heavy.. they include actions as well)
I found a similar question but there is no response on it --> similar q