I'm doing a proof of concept with Kafka, Spark and jupyter notebooks, and i'm having a weird issue. Im trying to read Avro records in from kafka to pyspark. I'm using the confluent schema registry to get the schema to deserialize the avro messages. After deserializing the avro messages in a spark dataframe the resulting column is empty, without any errors. The column should contain data because when cast to a string some of the avro fields are readable.
I have also tried to do it on the spark-shell in Scala (without jupyter) I have tried both a docker based spark as well as a standalone install of spark
I followed this SO topic to get the from_avro and to_avro functions: Pyspark 2.4.0, read avro from kafka with read stream - Python
jars = ["kafka-clients-2.0.0.jar", "spark-avro_2.11-2.4.3.jar", "spark-
sql-kafka-0-10_2.11-2.4.3.jar"]
jar_paths = ",".join(["/home/jovyan/work/jars/{}".format(jar) for jar in
jars])
conf = SparkConf()
conf.set("spark.jars", jar_paths)
spark_session = SparkSession \
.builder \
.config(conf=conf)\
.appName("TestStream") \
.getOrCreate()
def from_avro(col, jsonFormatSchema):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))
def to_avro(col):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
return Column(f(_to_java_column(col)))
schema_registry_url = "http://schema-registry.org"
transaction_schema_name = "Transaction"
transaction_schema = requests.get("
{}/subjects/{}/versions/latest/schema".format(schema_registry_url,
transaction_schema_name)).text
raw_df = spark_session.read.format("kafka") \
# SNIP
.option("subscribe", "transaction") \
.option("startingOffsets", "earliest").load()
raw_df = raw_df.limit(1000).cache()
extract_df = raw_df.select(
raw_df["key"].cast("String"),
from_avro(raw_df["value"], transaction_schema).alias("value")
)
# This shows data and fields
raw_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(3, truncate=False)
extract_df.show()
The contents of the value column is empty. I expect either an error since the decoding failed, or the data to be there. Does anyone know what might cause this, or how to debug it?
+---+-----+
|key|value|
+---+-----+
|...| [[]]|
|...| [[]]|
|...| [[]]|
|...| [[]]|