0

I'm doing a proof of concept with Kafka, Spark and jupyter notebooks, and i'm having a weird issue. Im trying to read Avro records in from kafka to pyspark. I'm using the confluent schema registry to get the schema to deserialize the avro messages. After deserializing the avro messages in a spark dataframe the resulting column is empty, without any errors. The column should contain data because when cast to a string some of the avro fields are readable.

I have also tried to do it on the spark-shell in Scala (without jupyter) I have tried both a docker based spark as well as a standalone install of spark

I followed this SO topic to get the from_avro and to_avro functions: Pyspark 2.4.0, read avro from kafka with read stream - Python

jars = ["kafka-clients-2.0.0.jar", "spark-avro_2.11-2.4.3.jar", "spark-        
sql-kafka-0-10_2.11-2.4.3.jar"]
jar_paths = ",".join(["/home/jovyan/work/jars/{}".format(jar) for jar in 
jars])

conf = SparkConf()
conf.set("spark.jars", jar_paths)

spark_session = SparkSession \
    .builder \
    .config(conf=conf)\
    .appName("TestStream") \
    .getOrCreate()

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 

schema_registry_url = "http://schema-registry.org"
transaction_schema_name = "Transaction"

transaction_schema = requests.get(" 
{}/subjects/{}/versions/latest/schema".format(schema_registry_url, 
transaction_schema_name)).text


raw_df = spark_session.read.format("kafka") \
# SNIP
    .option("subscribe", "transaction") \
    .option("startingOffsets", "earliest").load()
raw_df = raw_df.limit(1000).cache()

extract_df = raw_df.select(
    raw_df["key"].cast("String"),
    from_avro(raw_df["value"], transaction_schema).alias("value")
)

# This shows data and fields
raw_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(3, truncate=False)

extract_df.show()

The contents of the value column is empty. I expect either an error since the decoding failed, or the data to be there. Does anyone know what might cause this, or how to debug it?

+---+-----+
|key|value|
+---+-----+
|...| [[]]|
|...| [[]]|
|...| [[]]|
|...| [[]]|
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

2

You must manually deserialize the data. PySpark doesn’t officially support Confluent schema registry as of the time of writing. You need to use either KafkaAvroDeSerializer provided from Confluent or ABRiS, a 3rd-party Spark avro library.

ABRiS: https://github.com/AbsaOSS/ABRiS#using-abris-with-python-and-pyspark

KafkaAvroDeSerializer: Integrating Spark Structured Streaming with the Confluent Schema Registry

Reason: Confluent adds 5 extra bytes, including 1 for magic byte & 4 for schema ID, next to the Avro data, [Magic Byte|Schema ID|avro data], not a typical avro format. So you need to deserialize manually.

(Sorry I couldn't comment.)

Sheng-yi Hsu
  • 281
  • 3
  • 5