I know I can use Flink SQL to sort a stream by timestamp, but as I'm already using CEP, I'd like to use it for sorting instead.
Asked
Active
Viewed 108 times
1 Answers
1
Sorting with CEP is pretty easy, since CEP always sorts its input by timestamp. Something like this will do the trick:
DataStream<Event> streamWithTimestampsAndWatermarks = ...
Pattern<Event, ?> matchEverything =
Pattern.<Event>begin("any")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return true;
}
});
PatternStream<Event> patternStream = CEP.pattern(
streamWithTimestampsAndWatermarks, matchEverything);
SingleOutputStreamOperator<Event> sorted = patternStream
.select(new PatternSelectFunction<Event, Event>() {
@Override
public Event select(Map<String, List<Event>> map) throws Exception {
return map.get("any").get(0);
}
});
If you want to sort the stream key-by-key, rather than globally, then use keyBy
before applying a pattern to it.

David Anderson
- 39,434
- 4
- 33
- 60
-
What stops a `RichFlatMapFunction` in `sorted.keyBy(..).flatMap(new RichFlatMapFunction() {..})` from receiving events out of order like they often can as result of parallel execution? From what I understand, if I were to have a data `source` that produced events in event time order then the `RichFlatMapFunction` in `source.keyBy(..).flatMap(new RichFlatMapFunction() {..})` may still process them out of order. – Dan Aug 15 '22 at 22:14
-
If you want to work with a keyed stream, then apply the keyBy before using CEP to sort (which will then apply the sorting within each key, but not across keys). And then you can use [reinterpretAsKeyedStream](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream) to reestablish the keying before applying the flatmap, or you can just use a keyBy, but that will cause a pointless shuffle. – David Anderson Aug 15 '22 at 23:38
-
Thanks. How does the CEP stream guarantee that the downstream functions, such as `RichFlatMapFunction` receive data in sorted order? Where as the downstream functions of a regular data `SourceFunction`, for example, that emits events in sorted order, does not. – Dan Aug 15 '22 at 23:57
-
So long as two events follow the same path through the execution graph they will maintain their relative ordering. After the keyBy, all of the events for a given key will pass through the same instance of the CEP operator, and will exit that operator in sorted order. So long as you don't do anything after the CEP operator to reshuffle its output streams (e.g., by using keyBy with another key, or doing a rebalance), those events will stay in order. – David Anderson Aug 16 '22 at 02:01
-
Similarly, for a source operator, if all of the events for each key are in the same instance of the source, then they will retain their ordering, even if you key by that key -- because there's no possible race condition. However, if you key by some other key, or change the parallelism (with a rebalance), then those events will be able to race because they then take different paths through the graph. – David Anderson Aug 16 '22 at 02:06
-
Thank you for the explanation. So to keep a sorted stream `sorted.keyBy(..).map(..)` in sorted order after the `map` I would have to keep it keyed like this `sorted.keyBy(..).map(..).keyBy(..).map(..)`? However if I were to do `sorted.keyBy(..).map(..).map(..)` where I omit the second `keyBy(..)` there is the chance that the stream would become out of order on the last map? Also please feel free to mark the answer as correct as my testing shows it does exactly what I want. – Dan Aug 16 '22 at 08:05
-
2No, that's not correct. Map will not cause the stream to lose its ordering. Only an operation like a keyBy (and only if you change keys) or a rebalance will have this effect. – David Anderson Aug 16 '22 at 14:34