I'm building a Spark SQL app that consume from a Kafka topic, transform some data, then write back to a separate Kafka topic with a specific JSON object.
I've got most of this working - I can consume, transform, and write back to Kafka - it's the shape of the JSON object being written after the transformation, that I'm struggling with.
Right now I'm able to query/transform what I want and write it:
Dataset<Row> reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "new_separate_topic")
.save();
This produces a record like this, in the topic:
{
"record_count": 989
}
What I need, is for this bit of JSON to be a payload (child) property of a larger JSON object, which we use as a standard consumer object for our microservices.
What I want to write to the topic actually looks like this:
{
"id": "ABC123",
"timestamp": "2018-11-16 20:40:26.108",
"user": "DEF456",
"type": "new_entity",
"data": {
"record_count": 989
}
}
Also, the "id", "user", and "type" fields would be populated from the outside - they would come from the original Kafka message that triggers the whole process. Basically, I need to inject some values for the metadata/object I want to write to Kafka, and set the "data" field to the outcome of the Spark SQL query above.
Is this possible? How? Thanks!