-1

I'm building a Spark SQL app that consume from a Kafka topic, transform some data, then write back to a separate Kafka topic with a specific JSON object.

I've got most of this working - I can consume, transform, and write back to Kafka - it's the shape of the JSON object being written after the transformation, that I'm struggling with.

Right now I'm able to query/transform what I want and write it:

Dataset<Row> reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))
    .write()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "new_separate_topic")
    .save();

This produces a record like this, in the topic:

{
  "record_count": 989
}

What I need, is for this bit of JSON to be a payload (child) property of a larger JSON object, which we use as a standard consumer object for our microservices.

What I want to write to the topic actually looks like this:

{
  "id": "ABC123",
  "timestamp": "2018-11-16 20:40:26.108",
  "user": "DEF456",
  "type": "new_entity",
  "data": {
      "record_count": 989
    }
}

Also, the "id", "user", and "type" fields would be populated from the outside - they would come from the original Kafka message that triggers the whole process. Basically, I need to inject some values for the metadata/object I want to write to Kafka, and set the "data" field to the outcome of the Spark SQL query above.

Is this possible? How? Thanks!

zero323
  • 322,348
  • 103
  • 959
  • 935
Tsar Bomba
  • 1,047
  • 6
  • 29
  • 52
  • Possible duplicate of [How to add a constant column in a Spark DataFrame?](https://stackoverflow.com/questions/32788322/how-to-add-a-constant-column-in-a-spark-dataframe) – zero323 Nov 17 '18 at 17:43
  • FWIW, reading from Kafka and writing to Kafka is the strong-point of Kafka Streams applications. And you don't need a cluster scheduler like Spark – OneCricketeer Nov 17 '18 at 20:53
  • Why the downvote!? I realize it's a possible duplicate but I didn't know *what* I was looking for, hence the question. – Tsar Bomba Nov 18 '18 at 17:34
  • @cricket_007 Not sure I follow. I need spark to do data transformations in the middle of the operation. Kafka is just triggering the transformation, which writes a completely new message & payload to a different topic. It's not data moving directly from one topic to another, untouched. – Tsar Bomba Nov 18 '18 at 17:35
  • Kafka Streams is an embeddable Java application specifically designed for such transformations. Refer to the documentation, https://kafka.apache.org/20/documentation/streams/tutorial – OneCricketeer Nov 18 '18 at 18:32

1 Answers1

1

If you want to add new fields, then you cannot just select only one.

E.g. between write.format("kafka") and .select(), you need to do something like withColumn()

Dataset<Row> reader = myData.getRecordCount();
// Keep your DataSet as Columns
reader = reader.select("record_count"))

// Add more data
reader = reader.withColumn(...)

// Then convert structs to JSON and write the output 
reader.select(to_json(...))
    .write()
    .format("kafka")

the "id", "user", and "type" fields would be populated from the outside - they would come from the original Kafka message that triggers the whole process

Then you need to include select("id", "user", "type") in your code

Another option is using Kafka Streams rather than being forced into manuipulating DataSets, you can use actual Java classes / JSONObjects

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Thanks very much! This did the trick. I was able to build a new message w/ transformed payload this way. I explained why I needed what I posted in a comment in the OP. Would be interested in your feedback. – Tsar Bomba Nov 18 '18 at 17:37