If my Kafka topic receives records like
CHANNEL | VIEWERS | .....
ABC | 100 | .....
CBS | 200 | .....
And I have Spark structured streaming code to read and process Kafka records as follows:
val spark = SparkSession
.builder
.appName("TestPartition")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
.option("subscribe", "partition_test")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
// I will use a custom UDF to transform to a specific object
Currently, I process the records using foreachwriter as follows:
val writer = new ForeachWriter[testRec] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: testRec) = {
handle(record)
}
def close(errorOrNull: Throwable): Unit = {
}
}
val query = dataFrame.writeStream
.format("console")
.foreach(writer)
.outputMode("append")
.start()
The code works just fine. But, what I would like to do is to partition the incoming data by channels so that each worker is responsible for specific channels and I do in-memory computations related to that channel inside handle() block. Is that possible ? If yes, how do I do that ?