I followed @Ralph Gonzalez's message on this thread reading Avro messages from Kafka using Structured Streaming in Spark 2.1, but am getting the following error.
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I came across @Michael G. Noll post here, which suggests to use DataFileReader instead of binaryDecoder as below.
DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader);
I tried using this in Scala but was not successful. Below is the current state of the code.
def main(args: Array[String]) {
val KafkaBroker = "**.**.**.**:9092";
val InTopic = "avro";
// Get Spark session
val session = SparkSession
.builder
.master("local[*]")
.appName("myapp")
.getOrCreate()
// Load streaming data
import session.implicits._
//val msg=data.selectExpr("CAST(value AS Array[Byte])")
//val rec = reader.read(null, decoder.binaryDecoder(msg, null))
//val disp=msg.writeStream.outputMode("append").format("console").start()
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.map(d => {
val rec = reader.read(null, decoder.binaryDecoder(d, null))
val payload = rec.get("payload").asInstanceOf[Byte].toString
new KafkaMessage(payload)
})
val query = data.writeStream
.outputMode("Append")
.format("console")
.start()
query.awaitTermination()
}
My schema and case class looks like below
case class KafkaMessage(
payload: String )
val schemaString = """{
"type" : "record",
"name" : "HdfsEvent",
"namespace" : "com.expedia.txb.domain.hdfs",
"fields" : [ {
"name" : "payload",
"type" : {
"type" : "bytes",
"java-class" : "[B"
}
} ]
}"""
I have literally spent last 2 days on this and so any help will be highly appreciated. Thanks.