In my Spark streaming job, I am trying to read Confluentavro message from Kafka topic and getting "Malformed records are detected in record parsing."
I tried to debug a lot but not able to figure out the record which malformed. Need help in understanding how can I get the record from a row which is Malformed. Is there a way I can print the avro message to see what is wrong with the message.
My Code:
object AvroReadMessage extends App {
val spark = SparkSession.builder.master("local[*]").appName("AvroReadMessage")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("/read_message.avsc")))
val readKafkaDF = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic2")
.option("startingOffsets", "latest")
.load()
val jmap = new java.util.HashMap[String, String]()
jmap.put("mode", "PERMISSIVE")
val query = readKafkaDF
.select(from_avro('value, jsonFormatSchema, jmap) as 'value)
.select("value.*")
.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
}
Any help would be highly appreciated.