1

I'm starting to use KStream to consume data from an existing topic.

I'm only interested in getting the last event for a given ID within a 10 seconds window. I tried using the following code:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));

stream.selectKey((key, value) -> value.getID())
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    .reduce((value1, value2) -> value2)
    .toStream()
    .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));

But I end up getting all of the events and not just the last one. Is it possible to do what I want using KStream?

Val Bonn
  • 1,129
  • 1
  • 13
  • 31
Gruutak
  • 83
  • 2
  • 11
  • 2
    Hi. I am not an expert in windowing. But I guess you may take advantage of the suppress method, as explained here: https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results – Val Bonn Feb 13 '19 at 11:00
  • @ValBonn That worked. Thank you! – Gruutak Feb 13 '19 at 17:20

1 Answers1

6

Use .suppress()

It suppresses all the intermediate results from a window and emits only the final result.

stream.selectKey((key, value) -> value.getID())
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    .reduce((value1, value2) -> value2)
    .suppress(Suppressed.untilWindowCloses(unbounded())))  // like this
    .toStream()
    .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));

You can read more here : https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results

Nishu Tayal
  • 20,106
  • 8
  • 49
  • 101