14

I have written this code in a Kafka stream application:

KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
    .aggregate(() -> {...})
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .toStream()...

which should (if i understood it correctly) emit records per Key after the window is closed. Somehow the behavior is the following:

The stream doesn't emit the first record and only forward it after the second record even with a different Key and then the second record is emitted only after the 3rd and so forth..

I have tried multiple StreamConfigs with "exactly_once" and with or without Caching also, this behavior persists.

Thanks in advance for your help !

Stanislav Kralin
  • 11,070
  • 4
  • 35
  • 58
dborn
  • 143
  • 1
  • 5
  • if you want your data aggregated by period of time and not by "session", I guess you need to use `TimeWindows` instead of `SessionWindows`. – Vasyl Sarzhynskyi Jan 16 '19 at 19:20
  • That did not work for me. Have a timed window, but it still does not complete the suppression effect on old windows until new events are added for the same key. Very frustrating AND counter-intuitive! – BalRog Jul 10 '19 at 19:56

2 Answers2

18

That is expected behavior. Note, that suppress() is based on event-time. Thus, as long as no new data arrives, time cannot advance and thus evicting the record earlier would be wrong, because there is no guarantee, that the next record might belong to the current window.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thank you for the quick answer. In the documentation, it is stated that "after defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed" so i thought wether i have a Session or time window, i will emit the result only after the window is closed and also i can have multiple windows at the same time per record key. if it is not the case, what would be the idea to achieve such a behavior ? – dborn Jan 17 '19 at 09:04
  • I think i understand the behavior now. Thanks a lot ! Would it be possible then to trigger the "eviction" of certain records with certain keys using a join with a KStream ? – dborn Jan 17 '19 at 14:20
  • 1
    All you say in your first comment is true. The point is, that the window can only be closed if event-time progresses, and thus this only happens after a new record with larger timestamp occurs in the input. -- I don't understand your second comment though. – Matthias J. Sax Jan 17 '19 at 18:40
  • sorry, i meant to ask is there a way to emit the data without receiving a new record ? although according to your first answer, it is not possible – dborn Jan 18 '19 at 18:30
  • Correct. It's not possible to evict data if no new records are there -- otherwise, the contract of `suppress()` would be violated. I understand that this might be clumsy in a testing scenario, however, in a real deployment it's stream processing and thus no issue. – Matthias J. Sax Jan 19 '19 at 02:19
  • @dborn , did you find a solution or a workaround to implement what you need? – Val Bonn Jun 20 '19 at 12:12
  • 1
    @ValBonn i built a workaround which was a sort of a dummy stream that had a Punctuator that would do a periodic operation based on wall clock time (in order to push stream time xD) – dborn Jun 21 '19 at 13:05
  • @dborn, can you please post sample code of that punctuator? thanks – Michal Hlaváč Sep 02 '19 at 16:20
  • @ValBonn Can you please post the punctuator code? It would be quite helpful for others like me who are facing similar issue as in a CDC architecture, one cannot expect a new update for each and every upstream DB table. – Ganesh Mar 30 '21 at 06:13
-1

I do not think "Session Window" with "suppress()" will generate any output.

Correct me if an wrong. As per my knowledge, suppress() works only with Time Based Windows & it does not work with Session Based Windows.

RaAm
  • 1,072
  • 5
  • 22
  • 35
  • 1
    suppress works with SessionWindows - you need to make sure to work with + define the _grace_ period - otherwise a default of (24h - gapMs) will apply and the window therefore will '_not close_' for this grace period. Also see _org.apache.kafka.streams.kstream.SessionWindows#gracePeriodMs_ || example: `.windowedBy(SessionWindows.with(Duration.ofSeconds(10)).grace(Duration.ofSeconds(10)))` – Hartmut Nov 03 '20 at 08:41