1

I'm working with IOT and my devices are sending data to several Pulsar topics at a rate of around 5 per sec. I have days worth of messages on those topics and I need to process them and save the output in a database.

In my Flink code, after some massaging I do to the source inputs, I get to this particular point in which I need to do a stateful processing synced in time (event-time) out of 3 data streams; where 2 of them receive records very frequent (like 1 msg per sec) and the 3rd much less frequent (like 1 msg per minute).

I've tried to go simply with ds1.union(ds2).union(ds3).flatMap(...) but noticed there's no guarantee they are processed in event time order. The frequent inputs are getting much ahead in time than the other one, and this breaks my logic.

Is there a way to make this flatMap to happen in right event-time ordering?

Attempts

  • I couldn't fit my logic into using windows, because there's a shared state that needs to be stored and queried. And AFAIK you cant have custom state with windows (please correct me if I'm wrong)
  • I've tried to use aligned watermarks for that but when I lower too much the maxAllowedWatermarkDrift (to simulate the sync in time) the processing gets really really slow (I guess because sources are paused instead of buffered).

Other options I'm considering

  • Global windows. Will it help with processing in event-time order? My data is big, will I need to buffer everything into memory?
  • Batch execution mode. Will it help with processing in event-time order?

Thanks for all the help!

1 Answers1

1

I've discovered that using batch execution mode solves this problem. Which makes sense since the problem in itself happens only because there's already a lot of data batched to pe processed.

As the doc says:

In BATCH mode, where the input dataset is known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order. For readers familiar with streaming, in BATCH we can assume “perfect watermarks”.

  • 1
    It is possible to have custom state in windows. `ProcessWindowFunction.Context` gives you access to `KeyedStateStore windowState()` and `KeyedStateStore globalState()`. – David Anderson Feb 28 '23 at 22:40
  • 1
    You can use a `KeyedProcessFunction` or Flink SQL to sort the stream by timestamp before applying additional processing. – David Anderson Feb 28 '23 at 22:41
  • Thanks @DavidAnderson, I'll give it a try to custom state in windows when I have a chance. – André Casimiro Mar 01 '23 at 12:53