3

This is a Spark Streaming app that consumes Kafka messages encoded in Proto Buf. Using scalapb library. I am getting the following error. Please help.

> com.google.protobuf.InvalidProtocolBufferException: While parsing a
> protocol message, the input ended unexpectedly in the middle of a
> field.  This could mean either that the input has been truncated or
> that an embedded message misreported its own length.  at
> com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:82)
>   at
> com.google.protobuf.CodedInputStream.skipRawBytesSlowPath(CodedInputStream.java:1284)
>   at
> com.google.protobuf.CodedInputStream.skipRawBytes(CodedInputStream.java:1267)
>   at
> com.google.protobuf.CodedInputStream.skipField(CodedInputStream.java:198)
>   at com.example.protos.demo.Student.mergeFrom(Student.scala:59)  at
> com.example.protos.demo.Student.mergeFrom(Student.scala:11)   at
> com.trueaccord.scalapb.LiteParser$.parseFrom(LiteParser.scala:9)  at
> com.trueaccord.scalapb.GeneratedMessageCompanion$class.parseFrom(GeneratedMessageCompanion.scala:103)
>   at com.example.protos.demo.Student$.parseFrom(Student.scala:88)     at
> com.trueaccord.scalapb.GeneratedMessageCompanion$class.parseFrom(GeneratedMessageCompanion.scala:119)
>   at com.example.protos.demo.Student$.parseFrom(Student.scala:88)     at
> StudentConsumer$.StudentConsumer$$parseLine$1(StudentConsumer.scala:24)
>   at StudentConsumer$$anonfun$1.apply(StudentConsumer.scala:30)   at
> StudentConsumer$$anonfun$1.apply(StudentConsumer.scala:30)    at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)   at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)    at
> org.apache.spark.scheduler.Task.run(Task.scala:86)    at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)

The following is my code...

object StudentConsumer {
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{ SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    def parseLine(s: String): Student =
      Student.parseFrom(
        org.apache.commons.codec.binary.Base64.decodeBase64(s))

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","student").load()

    val ds2 = ds1.selectExpr("CAST(value AS String)").as[String].map(str => parseLine(str))

    val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()

  }
}
Sachith Wickramaarachchi
  • 5,546
  • 6
  • 39
  • 68
shylas
  • 99
  • 4
  • 13

2 Answers2

3

Based on the error, it looks like the messages you are trying to parse are either truncated or corrupted. Is the sender encoding the protobufs in base64 prior to sending them to Kafka?

If so, it's worth adding println(s) into parseLine and see if what you get looks like what you expect (maybe this CAST(value as String) has some unintended consequence from your input).

Finally, the following Kafka/Scala Streaming/ScalaPB example may be helpful to you, it assumes the messages are sent to Kafka as raw bytes:

https://github.com/thesamet/sbtb2016-votes/blob/master/spark/src/main/scala/votes/Aggregator.scala

thesamet
  • 6,382
  • 2
  • 31
  • 42
  • Thanks I am using that as reference, but I am messing up when trying to make it work for Structured Streaming. I am using spark.readStream. Please help in converting the following code snippet work for structured streaming.. val votesAsBytes = KafkaUtils.createDirectStream[String, Array[Byte]]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), kafkaParams)) val votes: DStream[Vote] = votesAsBytes.map { (cr: ConsumerRecord[String, Array[Byte]]) => Vote.parseFrom(cr.value()) } – shylas Nov 17 '16 at 17:05
3

Thanks @thesamet for the feedback.

The following code works...

  def main(args : Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").
      option("kafka.bootstrap.servers","localhost:9092").
      option("subscribe","student").load()

    val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

    val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()

  }
shylas
  • 99
  • 4
  • 13