I'm trying to create a Spark Streaming that consumes Kafka messages encoded in ProtoBuf.
Here is what I tried for the last few days
import spark.implicits._
def parseLine (str: Array[Byte]): ProtoSchema = ProtoSchema.parseFrom(str)
val storageLoc: String = "/tmp/avl/output"
val checkpointLoc: String = "/tmp/avl/checkpoint"
val dfStreamReader: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("failOnDataLoss", value = false)
.option("subscribe", topics)
.load()
val dfStreamReaderValues: Dataset[Array[Byte]] = dfStreamReader.map(row => row.getAs[Array[Byte]]("value"))
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
val dfRaw: DataFrame = spark.sqlContext.protoToDataFrame(rddProtoSchema.rdd)
val streamWriterAirline: StreamingQuery = dfRaw.writeStream
.format("parquet")
.option("path", storageLoc)
.option("checkpointLocation", checkpointLoc)
.outputMode(Append)
.trigger(ProcessingTime("2 seconds"))
.start()
spark.streams.awaitAnyTermination(20000)
With scalapb, I manage to make decode a binary proto file and convert to a dataframe. But with streaming, I get this exception at compile time in parsing line:
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
>>>>>
scala.ScalaReflectionException: <none> is not a term
Can anyone give some hint?