1

I'm to create a Spark component that subscribes to topics, receives data in JSON format, applies user specified queries on the data, and ultimately writes the results to another Kafka topic. With my current understanding and research, I've come to the following skeleton of an implementation:

SparkSession spark = SparkSession
        .builder()
        .appName("StructuredStreamingTest")
        .master("local[*]")
        .getOrCreate();

// Take a batch of data to infer the schema
Dataset<String> sample = spark
                  .read()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "localhost:9092")
                  .option("subscribe", mytopic)
                  .load()
                  .selectExpr("CAST(value AS STRING)")
                  .as(Encoders.STRING());

StructType sch = spark.read().json(sample).schema();

// The actual stream
Dataset<String> lines = spark
              .readStream()
              .format("kafka")
              .options(kafkaParams)
              .option(subscribeType, topics)
              .load()
              .selectExpr("CAST(value AS STRING)")
              .as(Encoders.STRING());

This should return a Dataset object (Dataframe in Scala, I believe) with column "value" and a chunk of JSON data under it. Now, the actual question is how can we parse the JSON to a Dataset according to the schema we got above, make and SQL query for the data in single string format, transform the resulting Dataset back to JSON, and write it to Kafka?

EDIT: Is this really a duplicate since the post referred is in Scala?

0 Answers0