0

I want to find the event time difference between every two consecutive input events. If the time difference is above a certain threshold then I want to output an event signalling the threshold has been breached. I also want the first event of the stream to always output this breach signal as an indication that it does not have a previous event to calculate a time difference with.

I tried using Flink's CEP library as it ensures that the events are ordered by event time.

The pattern I created is as follows:

Pattern.begin("begin").optional().next("end");

I use the optional() clause to cater for the first event as I figured the first event would be the only event where "begin" would not have a value.

When I input my events a1 a2 a3 a4 a5 I get the following output matches:

{a1} {a1 a2} {a2} {a2 a3} {a3} {a3 a4} {a4} {a4 a5}...

However I want the following as it will allow me to calculate the time difference between each consecutive event.

{a1} {a1 a2} {a2 a3} {a3 a4} {a4 a5}...

I have tried playing around with different AfterMatchSkipStrategy settings as well as IterativeCondition clauses but with no success.

Dan
  • 5,013
  • 5
  • 33
  • 59
  • Are those the only possible inputs and outputs? If not, it's unclear how to generalize from this one example. – David Anderson Aug 15 '22 at 17:38
  • @DavidAnderson my use case is that I want to find the event time difference between every two consecutive input events. If the time difference is above a certain threshold then I want to output an event signalling the threshold has been breached. I want the first event of the stream to output this breach signal as well. – Dan Aug 15 '22 at 18:23

1 Answers1

1

Marking "begin" as optional is what's causing the unwanted matches. I would look for some other way to generate the breach signal for the first event -- e.g., perhaps you could prepend a dummy first event.

Another approach would be to only use CEP or SQL for sorting the stream, and then use a RichFlatMap or stateful process function to implement the business logic: i.e., compute the differences and generate the breach signals.

See Can I use Flink CEP to sort a stream? for how to do this.

Dan
  • 5,013
  • 5
  • 33
  • 59
David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Thanks for the tips. The issue I have with only using RichFlatMapFunction is that there is no guarantee of message order by event time unless I am mistaken. So the first event into the RichFlatMapFunction that sets a ValueState flag may not be the first event ordered by event time. This seems to be the case when I unit test. I'm interested to know more about your second approach to order the stream using CEP and then use a RichFlatMapFunction. – Dan Aug 15 '22 at 21:27
  • Good point re: the first event. I've created an example here: https://stackoverflow.com/questions/73366687/can-i-use-flink-cep-to-sort-a-stream/73366688#73366688 – David Anderson Aug 15 '22 at 21:41