0

I tried to publish records from a dataframe built from an avro file while it is built from a CSV file using dataframe. I published the data into a kafka topic in avro format using to_avro(struct(*)) from the dataframe, I was able to view the binary data in the kafka topic.

When I am deserializing using this code:

jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()`

df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
  1. Decode the Avro data into a struct

  2. Encode the column name in Avro format.

output = df.select(from_avro("value", jsonFormatSchema).alias("user"))

When we fetch the data in the output using show() or display, it is throwing an issue like "spark exception unable to parse, use mode permissive". We already used that while fetching CSV data and applied the same mode while reading kafka. Can anyone help me to fetch this issue and parse the avro data?

Laurenz Albe
  • 209,280
  • 17
  • 206
  • 263
  • Does the binary data actually contain the schema in each event? – OneCricketeer Jun 21 '21 at 20:56
  • I am publishing data from a dataframe built from avro file using to_avro(struct(*)) – Aravind Jun 22 '21 at 08:33
  • Sure, but this is really meant for Avro **files**, IMO, despite what the Spark-Avro documentation shows. You should really write your own Serializer, and ideally [using a Schema Registry](https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry) – OneCricketeer Jun 22 '21 at 16:16
  • 1
    I some how able to deserialize but the constraint is it is working if we send data with schema Can we fetch avro data with out sending schema with record while publishing – Aravind Jun 23 '21 at 13:29
  • You need the schema somewhere, and ideally part of each record so the deserializer can determine how to convert each record, but that isn't strictly necessary if you can guarantee all records being produced will always have the same schema that the consumer can read (but thats hard to enforce). [The Confluent way of serializing the data](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format) is the only way I'm familiar with and send an ID with each message rather than the full schema text. I'm not really familiar with Spark's `to_avro` binary format – OneCricketeer Jun 23 '21 at 15:42
  • Thanks for info I will try to explore schema registry – Aravind Jun 24 '21 at 16:27
  • This might help you https://rohitsm.com/serializing-spark-dataframes-to-avro/ – OneCricketeer Jun 24 '21 at 17:17

0 Answers0