0

I'm trying to read from a Confluent Kafka topic using PySpark but I'm getting error regarding Malformed Records. I'm using the Avro Schema in Confluent Schema Registry, here is my code:

conf = {
    "url": schema_registry_url,
    "ssl.ca.location":"cert.pem"
}

sr_client = SchemaRegistryClient(conf)
schema = sr_client.get_version(schema_subject, "latest")
avro_schema = schema.schema.schema_str
avro_schema_from_registry = json.loads(avro_schema)
schema_id = schema.version
# use the following function to convert the data types
spark_schema = avro_schema_to_pyspark_schema(avro_schema_from_registry)

df_kafka = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "bootstrap_servers") \
  .option("subscribePattern", "topic") \
  .option("kafka.security.protocol","SSL") \
  .option("kafka.ssl.truststore.location", "certs.jks") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .option("mode", "PERMISSIVE") \
  .load()

df_kafka = df_kafka.select(from_avro(df_kafka["value"], avro_schema).alias("data"))

Is there anyone can help with the issue? thanks

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

0 Answers0