3

Scenario:

I have a stream of events coming from the sensor. The Event could be of T-type or J-Type.

  • T-type events have event occurred timestamp.
  • J-type events have a start and end timestamp.

Based on the start and end timestamp of J-Type event, apply aggregation logic on all the T-type events that fall within the time range and write the result to a DB.

For this, I have created a custom trigger, which gets triggered when a J-Type event is received. In my custom ProcessWindowFunction, I am performing the aggregation logic and time check.

But, there could be a scenario, where T-type event doesn't fall in the time range of the current J-Type event. In that case, the T-type event should be pushed to the next window before purging the current window.

Stream Window

Thought of Solutions:

  1. Push the unprocessed T-type events into the Kinesis stream (the source), in the custom window process function. (Worst case solution)

  2. Instead of FIRE_AND_PURGE, use FIRE, to maintain the state throughout the runtime. Remove processed elements using the elements Iterator. ( Not recommended, to keep an infinite window)

Would like to know, if there is any way to directly push the un-processed events back to the input stream (without kinesis). (Re-queuing)

Or

Is there any way to maintain state in the keyBy Context, so that, we perform computation on these unprocessed data, (before or)along with the window elements.

Arjun Sunil Kumar
  • 1,781
  • 3
  • 28
  • 46
  • Before discussing solutions, I'd like to better understand the problem. Are the T and J events in the same stream? And is that stream keyed -- and if so, by what (maybe by sensorId?)? You explained how there could be two open windows at the same time. How out-of-order can things be? Could there be three or more open windows simultaneously? And does the J event for a window always precede (or follow) the T events that go with it, or is any sort of ordering possible? – David Anderson Feb 20 '20 at 18:31
  • 1. Yes, they are in the same stream. 2. Yes, the stream is keyBy("sensorId"). 3. There won't be two open windows for the same "sensorId" simultaneously. The first window closes after we receive the first J-Type. Then the second window opens. Here J-Type initiates the window Trigger. In Window 1, we could also recieve T-types, which should be belonging to the subsequent window. – Arjun Sunil Kumar Feb 21 '20 at 04:52
  • We are associating T-type events to J-Type events, based on the event_occured time stamp in T-type events. If the event_occured time stamp falls in the J-Type time range, it belongs to that J-Type. If it doesn't fall under the time range, these T-type events should be pushed to next window, to check if it could be valid there. – Arjun Sunil Kumar Feb 21 '20 at 04:58
  • Processing engines, in general, considers the process pipeline, in terms of a Directed Acyclic Graph or DAG. This is where the processing can go through functions in a particular order, where the functions can be chained together, but the processing must never go back to an earlier point in the graph. Henceforth, re-queueing is not a strategy here. Used `ListState` as a solution to my current scenario. Reference: https://blog.scottlogic.com/2018/07/06/comparing-streaming-frameworks-pt1.html – Arjun Sunil Kumar Feb 23 '20 at 19:59

1 Answers1

3

Here are two solutions. They are more-or-less equivalent in their underlying behavior, but you might find one or the other easier to understand, maintain, or test.

As for your question, no, there is no way to loop back (re-queue) the unconsumed events without pushing them back to Kinesis. But simply holding on to them until they are needed should be fine.

Solution 1: Use a RichFlatMapFunction

As T-type events arrive, append them to a ListState object. When a J-type event arrives, collect to the output all matching T-type events from the list, and update the list to only retain those T-type events that will belong to later J-type events.

Solution 2: Use GlobalWindows with a custom Trigger and Evictor

In addition to what you've already done, implement an Evictor that (after the window has been FIREd) removes only the J-type event and all matching T-type events from the window.

Update: Clearing State for Stale Keys / Dead Sensors

With solution 1, you can use state TTL to arrange for any inactive state associated with dead keys to be purged. Or you could use a KeyedProcessFunction rather than a RichFlatMapFunction, and use timers to accomplish the same thing.

Managing state for stale keys with the window API can be less straightforward, but for solution 2 I believe you can extend your custom trigger to include a timeout that will PURGE the window. And if you have used global state in the ProcessWindowFunction, you will need to rely on state TTL to clean that up.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Hey David, thank you for the response. My understanding of window is that they are Akka actors. If a sensor is dead, the `sensorId` gets obsolete. So if we don't have a purging strategy for windows, these obsolete actors will be consuming the main memory. – Arjun Sunil Kumar Feb 21 '20 at 13:31
  • Re-queuing is the only best solution in our case since it simplifies the process, and reduces corner cases. If there is any approach to send back the files to the input stream directly (without kinesis), then please do share that approach. – Arjun Sunil Kumar Feb 21 '20 at 14:40
  • I was also thinking of having one more side-stream (kinesis or any other suggestion), which would receive these invalid elements from evictor() and will be union() with the main input stream. Please share your thought on that? – Arjun Sunil Kumar Feb 21 '20 at 14:56
  • I quite like solution 1 (either a stateful flatmap or a process function); that's what I would do. – David Anderson Feb 21 '20 at 15:00
  • I don't see how re-queuing solves the problem with dead sensors. You could end up with those last few T-type events endlessly cycling back into Kinesis. – David Anderson Feb 21 '20 at 15:03
  • Thank you for clarifying. I used List State. `ListState tDataListState = getRuntimeContext().getListState(unprocessedTDataStateDesc); ` in my custom ProcessWindowFunction. – Arjun Sunil Kumar Feb 21 '20 at 18:45