I am reading messages from a kafka topic
messageDFRaw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "test-message")\
.load()
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")
When I print the data frame from the above query I get the below console output.
|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|
How can I create a data frame from the DataStreamReader such that I have a dataframe with columns as |key|channel| username| message|
I tried following the accepted answer in How to read records in JSON format from Kafka using Structured Streaming?
struct = StructType([
StructField("channel", StringType()),
StructField("username", StringType()),
StructField("message", StringType()),
])
messageDFRaw.select(from_json("CAST(value AS STRING)", struct))
but, I get Expected type 'StructField', got 'StructType' instead
in from_json()