I'm using Spark Structured Streaming as described on this page.
I get correct message from Kafka topic but value is in Avro format. Is there some way to deserialize Avro records (something like KafkaAvroDeserializer
approach)?
I'm using Spark Structured Streaming as described on this page.
I get correct message from Kafka topic but value is in Avro format. Is there some way to deserialize Avro records (something like KafkaAvroDeserializer
approach)?
Spark >= 2.4
You can use from_avro
function from spark-avro
library.
import org.apache.spark.sql.avro._
val schema: String = ???
df.withColumn("value", from_avro($"value", schema))
Spark < 2.4
Define a function which takes Array[Byte]
(serialized object):
import scala.reflect.runtime.universe.TypeTag
def decode[T : TypeTag](bytes: Array[Byte]): T = ???
which will deserialize Avro data and create object, that can be stored in a Dataset
.
Create udf
based on the function.
val decodeUdf = udf(decode _)
Call udf
on value
val df = spark
.readStream
.format("kafka")
...
.load()
df.withColumn("value", decodeUdf($"value"))