0

I am trying to read a structured stream from kafka from pyspark. Everything works wonders until there is no schema registry involved. I recently wanted to integrated a schema registry on my cluster for obvious consistency reasons, however PySpark just doesn't want to cooperate :/

I tried both using Avro and Json schemas but to no use, the Avro schema values are mapped as empty strings or 0s, and the json schema directly to null...

Here is my code

from pyspark.sql import SparkSession
from pyspark.sql.functions import StructType, from_json, from_utc_timestamp, col, lit, udf
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField
import logging
# logging.basicConfig(level=logging.INFO)
import os

# jars found in /home/ettore/.ivy2/cache
# url = jar:file:/home/ettore/Documents/Portfolio/Spark%20Streams/venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml

spark = SparkSession \
    .builder \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.4.0") \
    .appName("KafkaStructuredStreams") \
    .getOrCreate()

# spark.sparkContext.setLogLevel("INFO")


# Schema conversion: Define the schema Struct
temperature_schema = StructType([
    StructField("city", StringType()),
    StructField("reading", IntegerType()),
    StructField("unit", StringType()),
    StructField("timestamp", IntegerType())
])

KEY_SCHEMA_STR = """
{
    "type": "record",
    "name": "CustomerKey",
    "fields": [
        {"name": "key", "type": "string"}
    ]
}
"""

VALUE_SCHEMA_STR = """
{
   "type": "record",
   "name": "CustomerType",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "age", "type": "int"}
   ]
}
"""

AVRO_STRUCT_SCHEMA = (StructType()
                      .add("name", StringType())
                      .add("age", IntegerType())
                      )

# Read stream

TOPIC = 'temp_readings'
TOPIC_NAME = 'CUSTOMER_TOPIC'

# Avro
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", TOPIC_NAME) \
    .option("startingOffsets", "earliest") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .load() \
    .select(
        col("key").cast("STRING").alias("customer_key"),
        from_avro(col("value"), VALUE_SCHEMA_STR).alias("customer_value")
        # from_avro(col("value"), VALUE_SCHEMA_STR, {"mode": "PERMISSIVE"}).alias("customer_value")
    )

# JSON
# df = spark \
#     .readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "localhost:9092") \
#     .option("subscribe", TOPIC) \
#     .option("startingOffsets", "latest") \
#     .option("kafka.sasl.mechanism", "PLAIN") \
#     .load() \
#     .select(
#         col("key").cast("STRING"),
#         from_json(col("value").cast("STRING"), schema=temperature_schema).alias("temp_value")
#     )


### Extract individual columns from the JSON struct
# Avro
# df_with_columns = df.select(
#     col("customer_key"),
#     col("customer_value.name").alias("Name"),
#     col("customer_value.age").alias("Age")
# )

# JSON
# df_with_columns = df.select(
#     col("key"),
#     col("temp_value.city").alias("city"),
#     col("temp_value.reading").alias("reading"),
#     col("temp_value.unit").alias("unit"),
#     col("temp_value.timestamp").alias("timestamp")
# )

# QUERYING

# Filter the data based on the "reading" field
# filtered_df = df_with_columns.filter("reading > 10 OR reading = 0")

# Group and count data by "key"
# result_df = filtered_df.groupBy("key").count()

""" Console streams """

query = df.writeStream \
    .trigger(processingTime='1 seconds') \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

From the documentation this is how you're supposed to do it, I think.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
ETisREAL
  • 90
  • 1
  • 2
  • 9
  • Remove `spark-avro` dependency. It doesn't do what you want since the data binary formats with the registry aren't the same. Checkout https://www.confluent.io/blog/consume-avro-data-from-kafka-topics-and-secured-schema-registry-with-databricks-confluent-cloud-on-azure/#step-7 – OneCricketeer Aug 05 '23 at 16:51
  • I saw that post, but frankly that's quite old. There is the 'from_avro()' function now. What is the purpose of that then if it can't convert an Avro schema? Respectfully, I don't think this was already answered. – ETisREAL Aug 05 '23 at 17:06
  • There always has been `from_avro()` since Spark 2.3-ish... It **does not** integrate with any registry. The other post shows how you can do that (old doesnt matter if it is still accurate) – OneCricketeer Aug 05 '23 at 17:13
  • As shown in the duplicate post, you need to extract out the actual Avro data in the value bytes - https://stackoverflow.com/a/76306315/2308683 – OneCricketeer Aug 05 '23 at 17:14
  • Ok, I hoped by now there would have been a better solution. Maybe I'll actually get my hands dirty here. Thank you for you patience @On – ETisREAL Aug 05 '23 at 17:29

0 Answers0