I have a KStream
in which I want to count some dimension of the events. I do it as follows:
KTable<Windowed<Long>, Counter> ret = input.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
.aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));
I want to have a new KStream
with those aggregations as events. I can do it easily like this:
ret.toStream().to("output");
The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute.
I think I can do it like this:
ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));
But I wonder if there's a better / more elegant way of doing this?