I want to read Kafka records serialized in Avro format using PySpark. I also read the Avro Schema from Confluent Schema Registry (avro Str).
I'm able to read from kafka (it returns the Kafka metadata like key, value, topic, partition, offset, timestamp and timestamptype), but I want to flatten the values into a PySpark dataframe. How deserialize and flatten the value into a PySpark dataframe?
Here is the Code:
df_kafka = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "bootstrap_servers") \
.option("subscribePattern", "topic") \
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location", "cert.jks") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.option("mode", "PERMISSIVE") \
.load()