I currently have some code that builds a KTable using aggregate:
inputTopic.groupByKey().aggregate(
Aggregator::new,
(key, value, aggregate) -> {
someProcessingDoneHere;
return aggregate;
},
Materialized.with(Serdes.String(), Serdes.String())
);
Once a given number of messages have been received and aggregated for a single key, I would like to push the latest aggregation state to another topic and then delete the key in the table.
I can obviously use a plain Kafka producer and have something like:
inputTopic.groupByKey().aggregate(
Aggregator::new,
(key, value, aggregate) -> {
someProcessingDoneHere;
if (count > threshold) {
producer.send(new ProducerRecord<String,String>("output-topic",
key, aggregate));
return null;
}
return aggregate;
},
Materialized.with(Serdes.String(), Serdes.String())
);
But I'm looking for a more "Stream" approach.
Any hint ?