3

I have a self-referencing protobuf schema:

message A { 
 uint64 timestamp = 1; 
 repeated A fields = 2; 
}

I am generating the corresponding Scala classes using scalaPB and then trying to decode the messages which are consumed from Kafka stream, following these steps:

def main(args : Array[String]) {

  val spark = SparkSession.builder.
    master("local")
    .appName("spark session example")
    .getOrCreate()

  import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
      option("kafka.bootstrap.servers","localhost:9092").
      option("subscribe","student").load()

  val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

  val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

  query.awaitTermination()

}

This is a related question here on StackOverflow.

However, Spark Structured Streaming throws a cyclic reference error at this line.

val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

I understand it is because of the recursive reference which can be handled in the Spark only at the driver (basically RDD or Dataset level). Has anyone figured a workaround for this, to enable recursive calling through UDF for instance?

stefanobaghino
  • 11,253
  • 4
  • 35
  • 63
Vibhor Nigam
  • 732
  • 1
  • 5
  • 12

1 Answers1

0

It turns out this is due to the limitation in a way spark architecture is made. To process the huge amount of data code is distributed over all the slave nodes along with a portion of the data and the results are coordinated through a master node. Now since there is nothing on the worker node to keep track of the stack hence recursion is not allowed at a worker, but only at the driver level.

In short with the current build of spark it is not possible to do this kind of recursive parsing. The best option is to move to java which has similar libraries and easily parses a recursive protobuf file.

Vibhor Nigam
  • 732
  • 1
  • 5
  • 12