2

I have a set of requirements as below:

  1. Message 'T' arrives, must wait for 5 seconds for corresponding message in 'A' to arrive (with same key). If it comes within 5 seconds, then send joined values and send downstream. If it does not come within 5 seconds, send only 'T' message downstream.
  2. Message 'A' arrives, must wait for 5 seconds for corresponding message in 'T' to arrive (with same key). If it comes within 5 seconds, then send joined values and send downstream. If it does not come within 5 seconds, send only 'A' message downstream.

enter image description here

My current thinking was to do a KStream-KStream Sliding Window OUTER join. However, that does not wait for 5 seconds before sending the (T, null) or (null, T) message downstream (that is done instantly).

I need to wait for a timeout to happen, and if a join did not occur, then send the unjoined message through.

I've attached a diagram to help make sense of the cases. I am trying to use DSL as much as possible.

Any help appreciated.

Zunair Syed
  • 73
  • 1
  • 7

1 Answers1

2

Okay I found a fairly hacky solution that i'm still evaluating, but will work for this scenario.

I can simply groupByKey at the end and then suppress until window expires, with an unbounded buffer.

Zunair Syed
  • 73
  • 1
  • 7
  • Interesting solution... How do you set the window size for the aggregation? Note, that `TimeWindows` are aligned to the epoch, ie, if you set window size 5sec, you get windows from `[0,5),[5,10)` etc. Thus, if T has timestamp 3, the window would end already 2 seconds later (not 5 seconds later). Furthermore, it seems that it might only work if you know that there won't be a second message on the same input topic with the same key within the timeout: if there could be a T2 after T1 before the T1 timeout expires, T2 would overwrite T1 (well, I guess it depends how you define your `aggregate()`). – Matthias J. Sax Aug 05 '20 at 05:21
  • Bottom line: Kafka Streams should change the semantics of `outerJoin()` and not emit eagerly. -- To build a custom solution, you might want to do the join completely manually. Using punctuation might be useful (even if not strictly necessary to use). – Matthias J. Sax Aug 05 '20 at 05:24
  • 1
    For the record, @MatthiasJ.Sax 's concern has been addressed and is tagged as "Fix Version 3.1.0", i.e. a future release (currently, we are at 3.0.0-rc2), here: https://issues.apache.org/jira/browse/KAFKA-10847 – Evgeniy Berezovsky Sep 09 '21 at 02:42