I am trying to consume from a Kafka topic that has been serialized to Avro and need to figure out how to deserializing it when consuming from a Spark Structure Streaming app.
Note I need a solution in Python
spark = SparkSession\
.builder\
.appName("De-Serializing ")\
.getOrCreate()
data = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.option("startingOffsets", "earliest")\
.load()
# Do Stuff here
query = data.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
Can anyone help me?