0

I'm using Spark Structured Streaming as described on this page.

I get correct message from Kafka topic but value is in Avro format. Is there some way to deserialize Avro records (something like KafkaAvroDeserializer approach)?

zero323
  • 322,348
  • 103
  • 959
  • 935
bajky
  • 332
  • 6
  • 17
  • Is the schema in the message? Or is it generated by Confluent serializers with only a schema ID? – OneCricketeer Feb 10 '18 at 00:51
  • I'm using schema registry. – bajky Feb 11 '18 at 23:35
  • Is there a specific reason you're not using Kafka Streams instead? Also, as the Spark documentation says, you need to deserialize values from Dataframe operations as the ByteDeserializer is always used https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations – OneCricketeer Feb 12 '18 at 05:05
  • Several posts I've found just use normal Spark Streaming. https://stackoverflow.com/questions/41193764/handling-schema-changes-in-running-spark-streaming-application – OneCricketeer Feb 12 '18 at 05:15
  • Check out this answer https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-kafka-schema-registry/49182004#49182004 Hope it helps – tstites May 31 '18 at 13:45

1 Answers1

1

Spark >= 2.4

You can use from_avro function from spark-avro library.

import org.apache.spark.sql.avro._

val schema: String = ???
df.withColumn("value", from_avro($"value", schema))

Spark < 2.4

  • Define a function which takes Array[Byte] (serialized object):

    import scala.reflect.runtime.universe.TypeTag
    
    def decode[T : TypeTag](bytes: Array[Byte]): T = ???
    

    which will deserialize Avro data and create object, that can be stored in a Dataset.

  • Create udf based on the function.

    val decodeUdf  = udf(decode _)
    
  • Call udf on value

    val df = spark
      .readStream
      .format("kafka")
      ...
      .load()
    
    df.withColumn("value", decodeUdf($"value"))
    
zero323
  • 322,348
  • 103
  • 959
  • 935
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115