5

I have the following piece of code to aggregate data hourly based on event time

KStream<Windowed<String>, SomeUserDefinedClass> windowedResults = inputStream
.groupByKey(Grouped.with(Serdes.String(), new SomeUserDefinedSerde<>()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ofMinutes(15)))
.aggregate
(
    // do some aggregation
)
.suppress(Suppressed.untilTimeLimit(Duration.ofMinutes(75), Suppressed.BufferConfig.unbounded()))
.toStream();

The issue is that I am unable to close the time window and emit the results if I don't receive data with the same key and a timestamp later than the time limit + grace period.

I would like to know what are the alternatives I can use to ensure the window is closed and data is emitted once a given time has passed (without waiting for any new data for the same key).

Is there an option/feature to make the untilTimeLimit parameter based on real time, and not the event time?

Note: This question is not about why a TimeWindow is not closed, but how to close it in the absence of new data

E Yeoh
  • 51
  • 1
  • 3
  • Possible duplicate of [Kafka Stream Suppress session-windowed-aggregation](https://stackoverflow.com/questions/54222594/kafka-stream-suppress-session-windowed-aggregation) – Bartosz Wardziński Feb 26 '19 at 17:31
  • check it: https://stackoverflow.com/questions/54222594/kafka-stream-suppress-session-windowed-aggregation/54226977#54226977 – Bartosz Wardziński Feb 26 '19 at 17:31
  • 1
    it's not the same question. this question is to ask for alternatives to solve the issue (and not why it is happening). the same keys does not always arrive every hour, so I need a mechanism to emit the results by a certain time instead of waiting a very long time (or forever) – E Yeoh Feb 26 '19 at 17:43
  • You can use Processor API. Using `Punctuator` you can schedule periodical _job_ to perform some work, ex. passing aggregation result forward - https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html – Bartosz Wardziński Feb 26 '19 at 17:47
  • 1
    You don't need a new record per key, but only per partition. Does this help? If there is not data at all, you could maybe use a "dummy-keys" (using keys that do not appear in your data) and make sure to send one record per partition to advance time. – Matthias J. Sax Feb 26 '19 at 23:18
  • @wardziniak . Using punctuator is possible, but it's not an efficient solution as there are a lot of keys to iterate to decide which has expired and needs forwarding – E Yeoh Feb 27 '19 at 10:02
  • @MatthiasJ.Sax Yes, it helps as I don't need to maintain a list of unexpired keys. I think the best solution as you mentioned is to use dummy-keys to advance the time for each partition and trigger the data to be sent downstream. It's more like a workaround, but I think it is the best solution for this issue – E Yeoh Feb 27 '19 at 10:07
  • @MatthiasJ.Sax Is it necessary for me to implement my own Partitioner interface to ensure I can send the dummy messages to each partition? Otherwise there is no guarantee each partition will get these dummy messages required to advance the time. – E Yeoh Mar 01 '19 at 12:34
  • Hmmm... Guess it depends how you inject the messages. If you use a regular producer, you can specify the partition number explicitly on `send()` -- for this case, the configured partitioner is ignored. – Matthias J. Sax Mar 01 '19 at 23:51
  • @MatthiasJ.Sax is there some new feature today since 2019 to cover this scenario? I faced same issue, but in my case it is prohibited to put dummy messages into original topic. I can restream original topic to a new one where i can put messages myself, but since topic is huge, this looks like a unwanted spending – vyacheslav.kislov Oct 14 '22 at 13:22
  • Nothing changed so far. Stream-time (which closes a window) only advanced if there is new data. – Matthias J. Sax Oct 18 '22 at 00:07

0 Answers0