I have a self-referencing protobuf schema:
message A {
uint64 timestamp = 1;
repeated A fields = 2;
}
I am generating the corresponding Scala classes using scalaPB
and then trying to decode the messages which are consumed from Kafka stream, following these steps:
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
This is a related question here on StackOverflow.
However, Spark Structured Streaming throws a cyclic reference error at this line.
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
I understand it is because of the recursive reference which can be handled in the Spark only at the driver (basically RDD
or Dataset
level). Has anyone figured a workaround for this, to enable recursive calling through UDF for instance?