13

I was trying to reproduce the example from [Databricks][1] and apply it to the new connector to Kafka and spark structured streaming however I cannot parse the JSON correctly using the out-of-the-box methods in Spark...

note: the topic is written into Kafka in JSON format.

val ds1 = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", IP + ":9092")
          .option("zookeeper.connect", IP + ":2181")
          .option("subscribe", TOPIC)
          .option("startingOffsets", "earliest")
          .option("max.poll.records", 10)
          .option("failOnDataLoss", false)
          .load()

The following code won't work, I believe that's because the column json is a string and does not match the method from_json signature...

    val df = ds1.select($"value" cast "string" as "json")
                .select(from_json("json") as "data")
                .select("data.*")

Any tips?

[UPDATE] Example working: https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
carlos rodrigues
  • 317
  • 5
  • 12

1 Answers1

25

First you need to define the schema for your JSON message. For example

val schema = new StructType()
  .add($"id".string)
  .add($"name".string)

Now you can use this schema in from_json method like below.

val df = ds1.select($"value" cast "string" as "json")
            .select(from_json($"json", schema) as "data")
            .select("data.*")
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
abaghel
  • 14,783
  • 2
  • 50
  • 66
  • 2
    if you have compiler warning "value $ is not a member..." Please don't forget about import spark.implicits._ It took me additional 5-10 mins to figure out – user1459144 Mar 22 '17 at 17:06
  • for me the question is, which library is providing the function called "from_json" ? I cant seem to place that !!! Help please.. – Raghav May 19 '17 at 17:59
  • @Raghav -> import org.apache.spark.sql.functions._ check the example here: https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala – carlos rodrigues May 21 '17 at 13:41
  • @abaghel - I was referring your blog here - https://www.programcreek.com/java-api-examples/?class=org.apache.spark.sql.streaming.StreamingQuery&method=awaitTermination without java bean, cant we parse/infer the json data ? – Satya Pavan Jun 17 '20 at 22:57
  • @abaghel i can't see the console output even though it print isStreaming as true – tharindu Mar 09 '21 at 03:09