In the this databricks blogpost they instruct how to extract json data from kafka:
# Construct a streaming DataFrame that reads from topic1
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("startingOffsets", "earliest") \
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# value schema: { "a": 1, "b": "string" }
schema = StructType().add("a", IntegerType()).add("b", StringType())
df.select( \
col("key").cast("string"),
from_json(col("value").cast("string"), schema))
Is there a way to create the schema from a known case class without manually defining all the fields?
Note: this is a spark 2.2 doc, and i believe you now have to add new
in the schema line:
schema = new StructType().add("a", new IntegerType()).add("b", new StringType())
I was able to figure this out thanks to this SO question.