4

My Kafka Streams aggregation reads a compact topic and does this:

(0_10, ..), (0_11, ..) ---> (0, [10]) (0, [10, 11])

I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them. Imagine Stream App consumes these messages:

  • (0_10, ..)
  • (1_11, ..)
  • (0_13, ..)

and if the 3 previous messages arrive in a short time window, I expect to see this:

  • (0,[10])
  • (0, [10, 13])
  • (1, [11])

I cannot figure out, how to tell my Kafka Stream application how long to wait for more aggregations, before spitting a new value.

My code is very simple

builder
    .table(keySerde, valueSerde, sourceTopic)
    .groupBy(StreamBuilder::groupByMapper)
    .aggregate(
        StreamBuilder::aggregateInitializer,
        StreamBuilder::aggregateAdder,
        StreamBuilder::aggregateSubtractor)
    .to(...);

Currently, it sometime aggregates in batches, but not sure how to tweak it:

{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
eddyP23
  • 6,420
  • 7
  • 49
  • 87
  • 1
    This new feature might be relevant now: https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results – Dmitry Minkovsky Feb 06 '19 at 20:36

1 Answers1

5

I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them.

This is not possible with Kafka Streams' windowing. Generally speaking, Kafka Streams windows don't "close" or "end" in the sense that you can't tell it to produce a final result once a window "closes" (there's no such concept). This is to accommodate late arriving results. You will see updates as messages arrive to the aggregation window. The frequency with which Kafka Streams spits out updates depends on caching (see below). For more see: How to send final kafka-streams aggregation result of a time windowed KTable?

Currently, it sometime aggregates in batches, but not sure how to tweak it:

What you're seeing there most likely is the result of caching in the stores that back the KTables. KTables only forward downstream messages when their changelogs flush and their offsets are committed. This is to maintain consistency in case their state needs to be restored. If you change your Kafka Streams' application's commit interval your cache flushes will be less frequent, and consequently you will see fewer updates forwarded from KTables (changelogs, aggregations, etc). But that's not related to windowing.

With all that said, if you want to do a windowed aggregate of a changelog stream, you can transform it from KTable to KStream using KTable#toStream(). Then you can specify windows in your aggregation step.

Dmitry Minkovsky
  • 36,185
  • 26
  • 116
  • 160
  • Thanks, I don't want to turn it into a Windowed stream, I most likely just want to increase commit interval. So i assume you are referring to `commit.interval.ms`, are there other settings I should have in mind? Because now the default is 30s, but even though it spits values every 30 seconds, I see the same key a few times - any idea why? – eddyP23 Feb 01 '18 at 14:57
  • 1
    Yep: `commit.interval.ms`. Also see https://stackoverflow.com/a/44737150/741970, where it says `cache.max.bytes.buffering` is also related. Not sure why you're seeing the same key a few times. I would debug that by (i) breaking everything relevant apart and inserting logs; (ii) disabling caches (you have to specify your own `StateStoreSupplier` with caching disabled) so you see all updates constantly. – Dmitry Minkovsky Feb 01 '18 at 15:05
  • 1
    If the auto commit time does not align perfectly with your windows (not sure if possible), you would commit during a window. Which means you would get an output record during the window and then another one at the next commit, if there were more records to aggregate in that window. – shaddow Feb 05 '18 at 19:42