0

In my Spark streaming job, I am trying to read Confluentavro message from Kafka topic and getting "Malformed records are detected in record parsing."

I tried to debug a lot but not able to figure out the record which malformed. Need help in understanding how can I get the record from a row which is Malformed. Is there a way I can print the avro message to see what is wrong with the message.

My Code:

object AvroReadMessage extends App {
val spark = SparkSession.builder.master("local[*]").appName("AvroReadMessage")
    .getOrCreate()
  spark.sparkContext.setLogLevel("WARN")

 val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("/read_message.avsc")))
 val readKafkaDF = spark.readStream.format("kafka")
   .option("kafka.bootstrap.servers", "localhost:9092")
   .option("subscribe", "topic2")
   .option("startingOffsets", "latest")
   .load()
val jmap = new java.util.HashMap[String, String]()
 jmap.put("mode", "PERMISSIVE")

  val query = readKafkaDF
    .select(from_avro('value, jsonFormatSchema, jmap) as 'value)
    .select("value.*")
    .writeStream.outputMode("append").format("console").start()

query.awaitTermination()
}

Any help would be highly appreciated.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
JDev
  • 1,662
  • 4
  • 25
  • 55

1 Answers1

1

spark-avro is unable to read Confluent Schema Registry formatted data.

Please refer to Integrating Spark Structured Streaming with the Confluent Schema Registry

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245