Although this is an old question I had the same problem. The application was restarting and the join function with an event time window was not triggering anymore because events from one of the streams finished before the crash. The join could recover the state but since there were no watermarks anymore from one of the stream, events were never join after a restart.
The solution that I found was to create a checkpoint for the latest watermark just after the source operator. Since there is no UDF to persist snapshots of watermarks I had to create my own operator that does not change the events (identity function) and save the latest watermark as its state. When Flink recovers from a crash the WatermarkStreamOperator.initializeState()
emits the last watermark checkpointed on the ListState<Long> latestWatermark
on the line processWatermark(new Watermark(maxWatermark))
. Then the join with the event time window can be triggered.
public class WatermarkStreamOperator<IN> extends AbstractUdfStreamOperator<IN, WatermarkFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private ListState<Long> latestWatermark;
public WatermarkStreamOperator(WatermarkFunction<IN> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception { System.out.println("WatermarkStreamOperator.initializeState");
super.initializeState(context);
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("latest-watermark", Long.class);
latestWatermark = context.getOperatorStateStore().getListState(descriptor);
List<Long> watermarkList = new ArrayList<>();
latestWatermark.get().forEach(watermarkList::add);
Long maxWatermark = watermarkList.stream().max(Long::compare).orElse(0L);
if (!maxWatermark.equals(Long.valueOf(0l))) {
System.out.println("watermarkList recovered max: " + maxWatermark);
processWatermark(new Watermark(maxWatermark));
}
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
System.out.println("processing watermark: " + mark.getTimestamp()); latestWatermark.update(Arrays.asList(mark.getTimestamp()));
super.processWatermark(mark);
}
}
And the identity UDF for the operator:
public interface WatermarkFunction<T> extends Function, Serializable {
T process(T value) throws Exception;
}
Finally I use the .transform()
to call my WatermarkStreamOperator
with MyTupleWatermarkFunc
.
DataStream<Tuple2<String, Integer>> dataStream = env
.addSource(new MySource(sentence))
.transform("myStatefulWatermarkOperator",
TypeInformation.of(String.class),
new WatermarkStreamOperator<>(new MyTupleWatermarkFunc()))
...
...
public class MyTupleWatermarkFunc implements WatermarkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public String process(String value) throws Exception {
return value;
}
}
Here are the unit and imtegration tests that I created for this https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java