1

I am reading messages from a kafka topic

messageDFRaw = spark.readStream\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("subscribe", "test-message")\
                    .load()

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")

When I print the data frame from the above query I get the below console output.

|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|

How can I create a data frame from the DataStreamReader such that I have a dataframe with columns as |key|channel| username| message|

I tried following the accepted answer in How to read records in JSON format from Kafka using Structured Streaming?

struct = StructType([
    StructField("channel", StringType()),
    StructField("username", StringType()),
    StructField("message", StringType()),
])

messageDFRaw.select(from_json("CAST(value AS STRING)", struct))

but, I get Expected type 'StructField', got 'StructType' instead in from_json()

pulsar
  • 141
  • 2
  • 13

1 Answers1

1

I ignored the warning Expected type 'StructField', got 'StructType' instead in from_json().

However, I had to cast the value from kafka message initially and then parse the json schema later.

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))

messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")
pulsar
  • 141
  • 2
  • 13