My Kafka Streams aggregation reads a compact topic and does this:
(0_10, ..)
, (0_11, ..)
--->
(0, [10])
(0, [10, 11])
I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them. Imagine Stream App consumes these messages:
(0_10, ..)
(1_11, ..)
(0_13, ..)
and if the 3 previous messages arrive in a short time window, I expect to see this:
(0,[10])
(0, [10, 13])
(1, [11])
I cannot figure out, how to tell my Kafka Stream application how long to wait for more aggregations, before spitting a new value.
My code is very simple
builder
.table(keySerde, valueSerde, sourceTopic)
.groupBy(StreamBuilder::groupByMapper)
.aggregate(
StreamBuilder::aggregateInitializer,
StreamBuilder::aggregateAdder,
StreamBuilder::aggregateSubtractor)
.to(...);
Currently, it sometime aggregates in batches, but not sure how to tweak it:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}