I'm starting to use KStream to consume data from an existing topic.
I'm only interested in getting the last event for a given ID within a 10 seconds window. I tried using the following code:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
But I end up getting all of the events and not just the last one. Is it possible to do what I want using KStream?