1

Im trying the below code in spark 2.4.3 to read Avro messages from kafka.

Schema is stored in confluent schema registry when the data gets publised on kafka. I have been trying out few solutions which has been already discussed here (Integrating Spark Structured Streaming with the Confluent Schema Registry / Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)) but couldn't make it work. Or I could not find a proper way of doing this especially when the schema is stored in some Schema Registry.

Here is the current code I'm trying out where at least I'm able to get some result but all the records are coming out as null values. Actually the topic has got data. Could someone please help me out on this?

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.avro.SchemaConverters

object ScalaSparkAvroConsumer {

    private val topic = "customer.v1"
    private val kafkaUrl = "localhost:9092"
    private val schemaRegistryUrl = "http://127.0.0.1:8081"

    private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

    private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
    private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

    def main(args: Array[String]): Unit = {
      val spark = getSparkSession()

      spark.sparkContext.setLogLevel("ERROR")

      spark.udf.register("deserialize", (bytes: Array[Byte]) =>
        DeserializerWrapper.deserializer.deserialize(bytes)
      )

      val df = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUrl)
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .load()

      val valueDataFrame = df.selectExpr("""deserialize(value) AS message""")

      import org.apache.spark.sql.functions._

      val formattedDataFrame = valueDataFrame.select(
        from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
        .select("parsed_value.*")

      formattedDataFrame
        .writeStream
        .format("console")
        .option("truncate", false)
        .start()
        .awaitTermination()
    }

    object DeserializerWrapper {
      val deserializer = kafkaAvroDeserializer
    }

    class AvroDeserializer extends AbstractKafkaAvroDeserializer {
      def this(client: SchemaRegistryClient) {
        this()
        this.schemaRegistry = client
      }

      override def deserialize(bytes: Array[Byte]): String = {
        val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
        genericRecord.toString
      }
    }
}

Getting the output as below:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------+
|header|control|
+------+-------+
|null  |null   |
|null  |null   |
|null  |null   |
|null  |null   |
+------+-------+
only showing top 20 rows        
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Leibnitz
  • 355
  • 5
  • 19
  • Possible duplicate of [Integrating Spark Structured Streaming with the Confluent Schema Registry](https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry) – OneCricketeer Nov 27 '19 at 11:00
  • I already tried out those as I stated in the description and couldn't make it work. Can you please advise me on this? – Leibnitz Nov 27 '19 at 17:28
  • I wrote the answer there and can verify it worked for me. If you get null, it's likely that the generated schema didn't align with the record content. The answer there didn't use `.asInstanceOf[GenericRecord]`, for example – OneCricketeer Nov 27 '19 at 21:51
  • 1
    Can you check what's inside `valueDataFrame`? Can you do `valueDataFrame.writeStream.format("console")`? And just to make it easier to debug, use `read` (Spark SQL) not `readStream` (Structured Streaming) until it gives you proper values. – Jacek Laskowski Nov 28 '19 at 15:55
  • Yes, I used `read` and it gave me the actual message as this. `{"header": {"Id": "123"},"control": {"subject": "EOD"}}` – Leibnitz Nov 28 '19 at 16:15

1 Answers1

0

An integration of Avro serialization, Kafka schema server and Spark Streaming with from_confluence_avro() will make your life easier. You can find it here:

https://github.com/AbsaOSS/ABRiS

Abdulrahman
  • 433
  • 4
  • 11