0

As one of my last step in a streaming application, I want to sort the out of order events in the system. To do so I used:

events.keyBy((Event event) -> event.id)
                .process(new SortFunction())
                .print();

Where sort function is:

public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
        private ValueState<PriorityQueue<Event>> queueState = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "sorted-events",
                    // type information of state
                    TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
                    }));
            queueState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
            TimerService timerService = context.timerService();

            if (context.timestamp() > timerService.currentWatermark()) {
                PriorityQueue<Event> queue = queueState.value();
                if (queue == null) {
                    queue = new PriorityQueue<>(10);
                }
                queue.add(event);
                queueState.update(queue);
                timerService.registerEventTimeTimer(event.timestamp);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
            PriorityQueue<Event> queue = queueState.value();
            Long watermark = context.timerService().currentWatermark();
            Event head = queue.peek();
            while (head != null && head.timestamp <= watermark) {
                out.collect(head);
                queue.remove(head);
                head = queue.peek();
            }
        }
    }

What Im trying to do now is try to paralelize it. My current idea is to do the following:

    events.keyBy((Event event) -> event.id)
                    .rebalance()
                    .process(new SortFunction()).setParalelism(3)
                    .map(new KWayMerge()).setParalelism(1).
                    .print();

If what I understand is correct, what should happend in this case, and correct me if I am wrong, is that a section of each of the Events for a given key (ideally 1/3) will go to each of the parallel instances of SortFunction, in which case, to have a complete sort, I need to create a a map, or another processFunction, that receives sorted Events from the 3 diferent instances and merges them back together.

If this thats the case, is there any way to distinguish the origin of the Event received by the map so that I can perform a 3-way merge on the map? If thats not possible my next idea will be to swap the PriorityQueue for a TreeMap and put everything into a window so that the merge happens at the end of the window once the 3 TreeMaps have been received. Does this other option make sense in case option a is non viable or is there a better solution to do something like this?

James
  • 1
  • 2

1 Answers1

0

First of all, you should be aware that using a PriorityQueue or a TreeMap in Flink ValueState is an okay idea if and only if you are using a heap-based state backend. In the case of RocksDB, this will perform quite badly, as the PriorityQueues will be deserialized on every access, and reserialized on every update. In general we recommend sorting based on MapState, and this is how sorting in implemented in Flink's libraries.

What this code will do

events.keyBy((Event event) -> event.id)
            .process(new SortFunction())

is to independently sort the stream on a key-by-key basis -- the output will be sorted with respect to each key, but not globally.

On the other hand, this

events.keyBy((Event event) -> event.id)
                .rebalance()
                .process(new SortFunction()).setParalelism(3)

won't work, because the result of the rebalance is no longer a KeyedStream, and the SortFunction depends on keyed state.

Moreover, I don't believe that doing 3 sorts of 1/3 of the stream and then merging the results will perform noticeably better than a single global sort. If you need to do a global sort, you might want to consider using the Table API instead. See the answer here for an example.

David Anderson
  • 39,434
  • 4
  • 33
  • 60