I am trying to read a structured stream from kafka from pyspark. Everything works wonders until there is no schema registry involved. I recently wanted to integrated a schema registry on my cluster for obvious consistency reasons, however PySpark just doesn't want to cooperate :/
I tried both using Avro and Json schemas but to no use, the Avro schema values are mapped as empty strings or 0s, and the json schema directly to null...
Here is my code
from pyspark.sql import SparkSession
from pyspark.sql.functions import StructType, from_json, from_utc_timestamp, col, lit, udf
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField
import logging
# logging.basicConfig(level=logging.INFO)
import os
# jars found in /home/ettore/.ivy2/cache
# url = jar:file:/home/ettore/Documents/Portfolio/Spark%20Streams/venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
spark = SparkSession \
.builder \
.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.4.0") \
.appName("KafkaStructuredStreams") \
.getOrCreate()
# spark.sparkContext.setLogLevel("INFO")
# Schema conversion: Define the schema Struct
temperature_schema = StructType([
StructField("city", StringType()),
StructField("reading", IntegerType()),
StructField("unit", StringType()),
StructField("timestamp", IntegerType())
])
KEY_SCHEMA_STR = """
{
"type": "record",
"name": "CustomerKey",
"fields": [
{"name": "key", "type": "string"}
]
}
"""
VALUE_SCHEMA_STR = """
{
"type": "record",
"name": "CustomerType",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
AVRO_STRUCT_SCHEMA = (StructType()
.add("name", StringType())
.add("age", IntegerType())
)
# Read stream
TOPIC = 'temp_readings'
TOPIC_NAME = 'CUSTOMER_TOPIC'
# Avro
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", TOPIC_NAME) \
.option("startingOffsets", "earliest") \
.option("kafka.sasl.mechanism", "PLAIN") \
.load() \
.select(
col("key").cast("STRING").alias("customer_key"),
from_avro(col("value"), VALUE_SCHEMA_STR).alias("customer_value")
# from_avro(col("value"), VALUE_SCHEMA_STR, {"mode": "PERMISSIVE"}).alias("customer_value")
)
# JSON
# df = spark \
# .readStream \
# .format("kafka") \
# .option("kafka.bootstrap.servers", "localhost:9092") \
# .option("subscribe", TOPIC) \
# .option("startingOffsets", "latest") \
# .option("kafka.sasl.mechanism", "PLAIN") \
# .load() \
# .select(
# col("key").cast("STRING"),
# from_json(col("value").cast("STRING"), schema=temperature_schema).alias("temp_value")
# )
### Extract individual columns from the JSON struct
# Avro
# df_with_columns = df.select(
# col("customer_key"),
# col("customer_value.name").alias("Name"),
# col("customer_value.age").alias("Age")
# )
# JSON
# df_with_columns = df.select(
# col("key"),
# col("temp_value.city").alias("city"),
# col("temp_value.reading").alias("reading"),
# col("temp_value.unit").alias("unit"),
# col("temp_value.timestamp").alias("timestamp")
# )
# QUERYING
# Filter the data based on the "reading" field
# filtered_df = df_with_columns.filter("reading > 10 OR reading = 0")
# Group and count data by "key"
# result_df = filtered_df.groupBy("key").count()
""" Console streams """
query = df.writeStream \
.trigger(processingTime='1 seconds') \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
query.awaitTermination()
From the documentation this is how you're supposed to do it, I think.