43

What I'd like to do is this:

  1. Consume records from a numbers topic (Long's)
  2. Aggregate (count) the values for each 5 sec window
  3. Send the FINAL aggregation result to another topic

My code looks like this:

KStream<String, Long> longs = builder.stream(
            Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
            longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
          .to("long-counts");

It looks like everything works as expected, but the aggregations are sent to the destination topic for each incoming record. My question is how can I send only the final aggregation result of each window?

Yassin Hajaj
  • 21,337
  • 9
  • 51
  • 89
odavid
  • 629
  • 1
  • 6
  • 17

3 Answers3

34

Update 2

With KIP-825 (Apache Kafka 3.3), you can specify a "emit strategy" via windowedBy(...). Default is EMIT_EAGER but you can also specify EMIT_FINAL to only get a single result per key and window when a window closes (ie, at point window-end + grace-period.

Update 1

With KIP-328 (Apache Kafka 2.1), a KTable#suppress() operator is added, that will allow to suppress consecutive updates in a strict manner and to emit a single result record per window; the tradeoff is an increase latency.

Original Answer

In Kafka Streams there is no such thing as a "final aggregation". Windows are kept open all the time to handle out-of-order records that arrive after the window end-time passed. However, windows are not kept forever. They get discarded once their retention time expires. There is no special action as to when a window gets discarded.

See Confluent documentation for more details: http://docs.confluent.io/current/streams/

Thus, for each update to an aggregation, a result record is produced (because Kafka Streams also update the aggregation result on out-of-order records). Your "final result" would be the latest result record (before a window gets discarded). Depending on your use case, manual de-duplication would be a way to resolve the issue (using lower lever API, transform() or process())

This blog post might help, too: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

Another blog post addressing this issue without using punctuations: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • 1
    Also: An upcoming feature of Kafka Streams will provide you with a config option (a buffer/cache whose size you can configure) to control the downstream/output data rate of Kafka Streams. If you set a larger buffer size, more downstream updates will be coalesced and thus the downstream rate will be lowered. – miguno Aug 15 '16 at 17:33
  • This is unfortunate. Flink gives the output of the window at the end of each 'cycle'. The kafka implementation makes it sound like we need an external timer process to dump the KTable after each window is finished. – ethrbunny Aug 23 '16 at 16:06
  • Kafka Stream takes a different approach than Flink. Basically, the output stream is the changelog of the result, ie, it contains every intermediate result, too. However, Kafka will also have a "queryable state" feature and might add something like triggers to change current KTable -- but that is not fixed. Kafka Streams is still new :) – Matthias J. Sax Aug 23 '16 at 18:01
  • Are there any events produced when a window gets to the end? Some way to dump a whole KTable at once? – ethrbunny Aug 23 '16 at 20:04
  • Currently not. Because Kafka Streams is able to handle out-of-order data, windows are never closed (until their retention period is over; and they get simply deleted). – Matthias J. Sax Aug 23 '16 at 20:55
  • 1
    As a (very belated) follow up: it would be super useful to be able to register a callback for window state notifications. Create, delete, etc. You have callbacks for the statestore - just continue the trend! – ethrbunny Mar 30 '17 at 14:08
  • Would you might creating a JIRA for this feature request :) – Matthias J. Sax Mar 30 '17 at 17:19
  • It might make sense to update this answer to include the fact that 1.0.0 supports wall clock punctuation, which should allow you to periodically check recently "closed" windows. – Dmitry Minkovsky Dec 18 '17 at 19:17
  • I just implemented my above suggestion. Added a processor to the topology that resulted from `StreamBuilder.build()`. I'm pretty satisfied with the result except PAPI processors seem to require parent source(s). Otherwise they don't initialize/run. In this case I don't need a parent source since the processor exists only to wall-clock punctuate. So it's hacked, but okay. – Dmitry Minkovsky Dec 18 '17 at 20:52
  • 1
    It is definitely a hack... :D – Matthias J. Sax Dec 18 '17 at 23:23
  • I just want everything emanate from and be wind up in to my Kafka Streams application. Is that too much to ask!?!? :) – Dmitry Minkovsky Dec 19 '17 at 16:13
  • Well. Stop you application, reset it, restart it. It's possible to do. And by "stop your application" I mean call `KafkaStreams#close()` -- no need to stop the whole JVM. – Matthias J. Sax Dec 19 '17 at 17:11
  • 3
    Do you have this implementation avalible for us mortals Dimitry? :) – SwissArmyKnife Mar 01 '18 at 08:57
  • @MatthiasJ.Sax If a stream uses exactly_once semantics and does (no time windows) `stream.groupBy(key).aggregate(..).toStream().to(myTopic)` will myTopic receive an aggregate for each event that is consumed and aggregated? From my experience this used to be the case for kafka v1.1.x, but now in v2.0.1 that we have upgraded this is not the case any more. – Vassilis Mar 28 '19 at 16:32
  • @MatthiasJ.Sax After some investigation, it seems that to always have a 1-1 relationship, you have to disable stream caching (`cache.max.bytes.buffering` = 0). Kafka version is not related, while EOS kind of helps as it sets `commit.interval.ms`=1ms. If you could verify my understanding it would be great :-) – Vassilis Mar 28 '19 at 17:35
  • That sounds correct. (note, that commit.interval.ms is set to 100ms if you enable EOS). However, the commit interval does not provide any guarantee that you see all updates -- setting cache size to zero is the right approach (note, that you can also disable caching on a per-store basis instead, via `Materialized` parameter). – Matthias J. Sax Apr 05 '19 at 18:30
7

From Kafka Streams version 2.1, you can achieve this using suppress.

There is an example from the mentioned apache Kafka Streams documentation that sends an alert when a user has less than three events in an hour:

KGroupedStream<UserId, Event> grouped = ...;
grouped
  .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
  .count()
  .suppress(Suppressed.untilWindowCloses(unbounded()))
  .filter((windowedUserId, count) -> count < 3)
  .toStream()
  .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

As mentioned in the update of this answer, you should be aware of the tradeoff. Moreover, note that suppress() is based on event-time.

0

I faced the issue, but I solve this problem to add grace(0) after the fixed window and using Suppressed API

public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {

        buildAggregateMetricsBySensor(stream)
                .to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));

    }

private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
        return stream
                .map((key, val) -> new KeyValue<>(val.getId(), val))
                .groupByKey(Grouped.with(String(), new SensorDataSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
                .aggregate(SensorAggregateMetricsDTO::new,
                        (String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
                        buildWindowPersistentStore())
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .map((key, value) -> KeyValue.pair(key.key(), value));
    }


    private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
        return Materialized
                .<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
                .withKeySerde(String())
                .withValueSerde(new SensorAggregateMetricsSerde());
    }

Here you can see the result

enter image description here

Sergio Sánchez Sánchez
  • 1,694
  • 3
  • 28
  • 48