1

The problem

I'm using Spark with AWS Glue to read Confluent encoded Avro records from a kafka topic. First, I tried using from_avro function of Spark, but no success because Confluent Avro encoded messages are different from what this function expects.

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1") \
  .option("subscribe", "topic") \
  .option("kafka.group.id", group_id) \
  .option("kafka.client.id", client_id) \
  .option("kafka.ssl.truststore.location", truststore) \
  .option("kafka.ssl.truststore.password", password) \
  .option("kafka.ssl.keystore.location", keystore) \
  .option("kafka.ssl.keystore.password", password) \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()

output = df.select(from_avro("value", jsonFormatSchema, options={"mode":"PERMISSIVE"}).alias("data"))
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

0 Answers0