1

I'm using Spark Structured Streaming with Kafka integration in order to read Avro messages in order to deserialized them. The goal is to read these messages using a generated POJO as a schema. The code I'm using is the following :

  val kafkaConsumerDf: DataFrame = sparkSession
    .readStream
    .format("kafka")
    .option("subscribe", inputTopic)
    .option("group.id", queryName)
    .option("startingOffsets", "earliest")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .load()

  kafkaConsumerDf
    .writeStream
    .queryName(queryName)
    .option("checkpointLocation", checkpointPath)
    .foreachBatch((batchDF: DataFrame, batchId: Long) => {

      val deserializedDf: DataFrame = batchDF.select(
        from_avro(col("value"), schemaRegistryConfig) as "value"
      ).select("value.*") 

    }).start()

The schema of the data is as follows:

{
  "fields": [
    {
      "name": "idA",
      "type": "string"
    },
    {
      "name": "idB",
      "type": "string"
    },
    {
      "name": "idC",
      "type": "string"
    },
    {
      "name": "name",
      "type": [
        "string",
        "null"
      ]
    }
  ],
  "name": "Avro",
  "namespace": "com.test.avro",
  "type": "record"
}

As I stated above, I will want to use a POJO (Avro) as a schema to read the consumed data into Kafka. The POJO that represents the data structure is:

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */

import scala.annotation.switch

final case class Avro(var idA: String, var idB: String, var idC: String, var name: Option[String]) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this("", "", "", "", None, None, None, None)
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case 0 => {
        idA
      }.asInstanceOf[AnyRef]
      case 1 => {
        idB
      }.asInstanceOf[AnyRef]
      case 2 => {
        idC
      }.asInstanceOf[AnyRef]
      case 3 => {
        name match {
          case Some(x) => x
          case None => null
        }
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case 0 => this.idA = {
        value.toString
      }.asInstanceOf[String]
      case 1 => this.idB = {
        value.toString
      }.asInstanceOf[String]
      case 2 => this.idC = {
        value.toString
      }.asInstanceOf[String]
      case 4 => this.name = {
        value match {
          case null => None
          case _ => Some(value.toString)
        }
      }.asInstanceOf[Option[String]]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = Avro.SCHEMA$
}

object Equipement {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro\",\"namespace\":\"com.test.avro\",\"fields\":[{\"name\":\"idA\",\"type\":\"string\"},{\"name\":\"idB\",\"type\":\"string\"},{\"name\":\"idC\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"string\",\"null\"]}]}")
}

So, instead of using the from_avro() function and the schema registry, is it possible to use its POJO in order to deserialize the data? For example:

val deserializedDf = batchDF.
  ...
  ...
  .as[Avro]

Do you have any ideas?

Mamaf
  • 345
  • 4
  • 10

0 Answers0