How can I implement an operator with Flink's DataStream API that sends an event when no data was received from a stream for a certain amount of time?
Asked
Active
Viewed 5,861 times
2 Answers
17
Such an operator can be implemented using a ProcessFunction
.
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);
input
// use keyBy to have keyed state.
// NullByteKeySelector will move all data to one task. You can also use other keys
.keyBy(new NullByteKeySelector())
// use process function with 60 seconds timeout
.process(new TimeOutFunction(60 * 1000));
The TimeOutFunction
is defined as follows. In this example it uses processing time.
public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private transient ValueState<Long> lastTimer;
public TimeOutFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer state
ValueStateDescriptor<Long> lastTimerDesc =
new ValueStateDescriptor<Long>("lastTimer", Long.class);
lastTimer = getRuntimeContext().getState(lastTimerDesc);
}
@Override
public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
// get current time and compute timeout time
long currentTime = ctx.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
ctx.timerService().registerProcessingTimeTimer(timeoutTime);
// remember timeout time
lastTimer.update(timeoutTime);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
// check if this was the last timer we registered
if (timestamp == lastTimer.value()) {
// it was, so no data was received afterwards.
// fire an alert.
out.collect(true);
}
}
}

Fabian Hueske
- 18,707
- 2
- 44
- 49
-
3A small tweek. This setup is fine if the stream receives data atleast once. Any way to detect, if the stream does not receive data at all. Not even once? – madhairsilence Jul 03 '18 at 06:08
-
Can I use your solution with Kafka consumer, including an `out.collect` in `processElement`? (my full question is https://stackoverflow.com/questions/58280077/apache-flink-kafka-stream-get-all-messages-and-stop). At the moment my consumer don't stop and fetches infinitely.. – techkuz Oct 09 '19 at 04:33
-
In the above example since we register a ProcessingTimeTimer. Shall we specify TimeCharacteristic.EventTime/ProcessingTime, or it does not matter in this context. Any hint? – Mazen Ezzeddine Aug 26 '20 at 14:03
-
You can always register processing time timers. If you need event time, you should enable it via the TimeCharacteristics – Fabian Hueske Aug 27 '20 at 12:55
-
2This does not answer the question. This only works if the process function receives an element at least once, as a timer can only be created in "processElement". – kekkler Nov 11 '20 at 19:41
1
You could set up a time window with a custom trigger function. In the trigger function, every time the an event is received the "onEvent" method would set a processingTimeTrigger to "currentTime + desiredTimeDelay". Then when a new event comes, you delete the trigger that was previously set and make a new one. If an event doesn't come by the time the system time is the time on the processingTimeTrigger, it fires and the window would be processed. Even if no events came, the list of events that are going to be processed would just be empty.

Jicaar
- 1,044
- 10
- 26
-
-
I would go with @Fabian Hueske's answer. More strait forward for your purposes. – Jicaar Nov 02 '17 at 15:30