1

I have streamed data in Avro format in Kafka storage and managed the schema of the data via the confluent schema registry.

I'd like to pull the data using pyspark and parse the Avro byte data using schema from schema registry but it kept raising errors.

Below are code:

import json
from schema_registry.client import SchemaRegistryClient

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,io.delta:delta-core_2.12:1.1.0,org.apache.spark:spark-avro_2.12:3.2.1 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'


topic_name = "my_topic"

bootstrap_server = "pkc-XXXXy.us-east-1.aws.confluent.cloud:9092"
kafka_options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="MY_USERNAME" password="MY_PASSWORD";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol" : "SASL_SSL",
    "kafka.bootstrap.servers": bootstrap_server,
    "group.id": "group_wow2222",
    "subscribe": topic_name,
    "startingOffsets": "earliest",
    "maxOffsetsPerTrigger": 5,
}

log_streaming_df = spark \
  .readStream \
  .format("kafka") \
  .options(**kafka_options) \
  .load()

sr_client = SchemaRegistryClient(
    {
        "url": "https://psrc-XXXX.us-east-2.aws.confluent.cloud",
        "basic.auth.credentials.source": "USER_INFO",
        "basic.auth.user.info": "MY_USERNAME_FOR_SR:MY_PW_FOR_SR"
    }
)
schema_obj = sr_client.get_schema(f"{topic_name}-value", version="latest")


log_streaming_df = log_streaming_df.select(
    from_avro(
        "value",
        parse_schema(schema_obj.schema.raw_schema)
        # json.dumps(schema_obj.schema.raw_schema)
    )
).alias("logs")
log_streaming_df.writeStream \
    .format("console") \
    .option("checkpointLocation", "./my_checkpoint") \
    .outputMode("append") \
    .start()

Errors:

Aborting task                        (0 + 1) / 2]
org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
        at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:113)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:412)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.spark.s

But when I tried saving stream data in parquet format to disk first, then read the data in static dataframe(not a streaming dataframe), it works like charm.

But one thing to note here: In this time, I exclude first 4 bytes(which is called the magic bytes, indicating schema id of the schema registry) and then parsed the rest using the fastavro:

df = spark.read.parquet("./temp/*.parquet")  # saved data in advance from read stream (kafka)


import pandas as pd

from fastavro import schemaless_reader, parse_schema
from io import BytesIO

schema = parse_schema(schema_obj.schema.raw_schema)

@pandas_udf(StringType())
def simple_udf(v: pd.Series) -> pd.Series: 
    return v.apply(lambda x: json.dumps(schemaless_reader(BytesIO(x[5:]), schema)))

df = df.withColumn("real_value", simple_udf(col("value")))

Result:

enter image description here

According to above result, it looks like from_avro doesn't consider the first 4bytes of data so it raises errors(malformed error).

If I'm right, how can I handle it easily without using pandas_udf?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
user3595632
  • 5,380
  • 10
  • 55
  • 111
  • Substring function is what you're looking for. And it's 5 bytes, not 4. https://www.confluent.io/blog/consume-avro-data-from-kafka-topics-and-secured-schema-registry-with-databricks-confluent-cloud-on-azure/#step-7 – OneCricketeer Nov 30 '22 at 14:12

0 Answers0