8

I have a KStream in which I want to count some dimension of the events. I do it as follows:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
  .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
  .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));

I want to have a new KStream with those aggregations as events. I can do it easily like this:

ret.toStream().to("output");

The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute.

I think I can do it like this:

ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));

But I wonder if there's a better / more elegant way of doing this?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
user1028741
  • 2,745
  • 6
  • 34
  • 68

1 Answers1

10

You can use new feature of KTable KTable.suppress in version 2.1

This method allows you get exactly one final result per window/key for windowed computations.

More about suppres in KIP-328

You can update your implementation with suppress like this:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
        .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
        .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
        .suppress(untilWindowCloses(BufferConfig.unbounded()));

ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes
Stefan Repcek
  • 2,553
  • 4
  • 21
  • 29
  • This looks like a great solution to what I've been looking for – user1028741 Dec 27 '18 at 21:27
  • 2
    seems aggregation with `suppress(..) ` does not propagate messages at all if `grace(..)` method not invoked on `TimeWindows` – Vasyl Sarzhynskyi Jan 01 '19 at 15:05
  • I confirm what observed by Vasiliy Sarzhynskyi If grace () is not specified then no events are emitted at all. I had to setup a configuration like this: .windowedBy ( TimeWindows.of ( Duration.ofMillis ( 1000 ) ).grace ( Duration.ofMillis ( 5000 ) ) ) – Massimo Da Ros Jun 05 '19 at 19:49