0

I am trying to consume from a Kafka topic that has been serialized to Avro and need to figure out how to deserializing it when consuming from a Spark Structure Streaming app.

Note I need a solution in Python

spark = SparkSession\
    .builder\
    .appName("De-Serializing ")\
    .getOrCreate()

data = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .option("startingOffsets", "earliest")\
    .load()

# Do Stuff here

query = data.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

Can anyone help me?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
anonuser1234
  • 511
  • 2
  • 11
  • 24
  • It's more possible in Scala / Java because the Confluent libraries are more easily importable that way. https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry – OneCricketeer Mar 21 '19 at 19:47
  • @cricket_007, unfortunately the app I am working on is all in python. It's okay though, I went with another approach. – anonuser1234 Mar 22 '19 at 17:54
  • It's possible to use any Java library in PySpark. It just doesn't look clean, and I haven't done it for this use-case in particular – OneCricketeer Mar 22 '19 at 18:10

0 Answers0