I have streamed data in Avro format in Kafka storage and managed the schema of the data via the confluent schema registry.
I'd like to pull the data using pyspark and parse the Avro byte data using schema from schema registry but it kept raising errors.
Below are code:
import json
from schema_registry.client import SchemaRegistryClient
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,io.delta:delta-core_2.12:1.1.0,org.apache.spark:spark-avro_2.12:3.2.1 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'
topic_name = "my_topic"
bootstrap_server = "pkc-XXXXy.us-east-1.aws.confluent.cloud:9092"
kafka_options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="MY_USERNAME" password="MY_PASSWORD";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.bootstrap.servers": bootstrap_server,
"group.id": "group_wow2222",
"subscribe": topic_name,
"startingOffsets": "earliest",
"maxOffsetsPerTrigger": 5,
}
log_streaming_df = spark \
.readStream \
.format("kafka") \
.options(**kafka_options) \
.load()
sr_client = SchemaRegistryClient(
{
"url": "https://psrc-XXXX.us-east-2.aws.confluent.cloud",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "MY_USERNAME_FOR_SR:MY_PW_FOR_SR"
}
)
schema_obj = sr_client.get_schema(f"{topic_name}-value", version="latest")
log_streaming_df = log_streaming_df.select(
from_avro(
"value",
parse_schema(schema_obj.schema.raw_schema)
# json.dumps(schema_obj.schema.raw_schema)
)
).alias("logs")
log_streaming_df.writeStream \
.format("console") \
.option("checkpointLocation", "./my_checkpoint") \
.outputMode("append") \
.start()
Errors:
Aborting task (0 + 1) / 2]
org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:113)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:412)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.spark.s
But when I tried saving stream data in parquet format to disk first, then read the data in static dataframe(not a streaming dataframe), it works like charm.
But one thing to note here: In this time, I exclude first 4 bytes(which is called the magic bytes, indicating schema id
of the schema registry) and then parsed the rest using the fastavro
:
df = spark.read.parquet("./temp/*.parquet") # saved data in advance from read stream (kafka)
import pandas as pd
from fastavro import schemaless_reader, parse_schema
from io import BytesIO
schema = parse_schema(schema_obj.schema.raw_schema)
@pandas_udf(StringType())
def simple_udf(v: pd.Series) -> pd.Series:
return v.apply(lambda x: json.dumps(schemaless_reader(BytesIO(x[5:]), schema)))
df = df.withColumn("real_value", simple_udf(col("value")))
Result:
According to above result, it looks like from_avro
doesn't consider the first 4bytes of data so it raises errors(malformed error).
If I'm right, how can I handle it easily without using pandas_udf
?