I'm using Spark Structured Streaming with Kafka integration in order to read Avro messages in order to deserialized them. The goal is to read these messages using a generated POJO as a schema. The code I'm using is the following :
val kafkaConsumerDf: DataFrame = sparkSession
.readStream
.format("kafka")
.option("subscribe", inputTopic)
.option("group.id", queryName)
.option("startingOffsets", "earliest")
.option("kafka.bootstrap.servers", bootstrapServers)
.load()
kafkaConsumerDf
.writeStream
.queryName(queryName)
.option("checkpointLocation", checkpointPath)
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
val deserializedDf: DataFrame = batchDF.select(
from_avro(col("value"), schemaRegistryConfig) as "value"
).select("value.*")
}).start()
The schema of the data is as follows:
{
"fields": [
{
"name": "idA",
"type": "string"
},
{
"name": "idB",
"type": "string"
},
{
"name": "idC",
"type": "string"
},
{
"name": "name",
"type": [
"string",
"null"
]
}
],
"name": "Avro",
"namespace": "com.test.avro",
"type": "record"
}
As I stated above, I will want to use a POJO (Avro
) as a schema to read the consumed data into Kafka. The POJO that represents the data structure is:
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch
final case class Avro(var idA: String, var idB: String, var idC: String, var name: Option[String]) extends org.apache.avro.specific.SpecificRecordBase {
def this() = this("", "", "", "", None, None, None, None)
def get(field$: Int): AnyRef = {
(field$: @switch) match {
case 0 => {
idA
}.asInstanceOf[AnyRef]
case 1 => {
idB
}.asInstanceOf[AnyRef]
case 2 => {
idC
}.asInstanceOf[AnyRef]
case 3 => {
name match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
}
def put(field$: Int, value: Any): Unit = {
(field$: @switch) match {
case 0 => this.idA = {
value.toString
}.asInstanceOf[String]
case 1 => this.idB = {
value.toString
}.asInstanceOf[String]
case 2 => this.idC = {
value.toString
}.asInstanceOf[String]
case 4 => this.name = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
()
}
def getSchema: org.apache.avro.Schema = Avro.SCHEMA$
}
object Equipement {
val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro\",\"namespace\":\"com.test.avro\",\"fields\":[{\"name\":\"idA\",\"type\":\"string\"},{\"name\":\"idB\",\"type\":\"string\"},{\"name\":\"idC\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"string\",\"null\"]}]}")
}
So, instead of using the from_avro()
function and the schema registry, is it possible to use its POJO in order to deserialize the data? For example:
val deserializedDf = batchDF.
...
...
.as[Avro]
Do you have any ideas?