I'm trying to read from a Confluent Kafka topic using PySpark but I'm getting error regarding Malformed Records. I'm using the Avro Schema in Confluent Schema Registry, here is my code:
conf = {
"url": schema_registry_url,
"ssl.ca.location":"cert.pem"
}
sr_client = SchemaRegistryClient(conf)
schema = sr_client.get_version(schema_subject, "latest")
avro_schema = schema.schema.schema_str
avro_schema_from_registry = json.loads(avro_schema)
schema_id = schema.version
# use the following function to convert the data types
spark_schema = avro_schema_to_pyspark_schema(avro_schema_from_registry)
df_kafka = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "bootstrap_servers") \
.option("subscribePattern", "topic") \
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location", "certs.jks") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.option("mode", "PERMISSIVE") \
.load()
df_kafka = df_kafka.select(from_avro(df_kafka["value"], avro_schema).alias("data"))
Is there anyone can help with the issue? thanks