I have a custom flink Source
, and I have a SerializableTimestampAssigner
that assigns event timestamps to records emitted by the source. The source may emit records out of order because of the nature of the underlying data storage, however with BATCH mode, I expect Flink to sort these records by event timestamp before any operator processes them.
Excerpted from Flink document on execution mode:
In BATCH mode, where the input dataset is known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order.
However, this doesn't seem to be the case. If I create a datastream out of the Source
(StreamExecutionEnvironment.fromSource
) with my timestamp assigner, and then datastream.addSink(x => println(extractTimestamp(x))
, the output isn't strictly ascending. Is my understanding of the document wrong? Or does flink expect me (the users) to sort the input dataset themselves?