As one of my last step in a streaming application, I want to sort the out of order events in the system. To do so I used:
events.keyBy((Event event) -> event.id)
.process(new SortFunction())
.print();
Where sort
function is:
public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
private ValueState<PriorityQueue<Event>> queueState = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
PriorityQueue<Event> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
Event head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
}
What Im trying to do now is try to paralelize it. My current idea is to do the following:
events.keyBy((Event event) -> event.id)
.rebalance()
.process(new SortFunction()).setParalelism(3)
.map(new KWayMerge()).setParalelism(1).
.print();
If what I understand is correct, what should happend in this case, and correct me if I am wrong, is that a section of each of the Events for a given key (ideally 1/3) will go to each of the parallel instances of SortFunction
, in which case, to have a complete sort, I need to create a a map
, or another processFunction
, that receives sorted Events from the 3 diferent instances and merges them back together.
If this thats the case, is there any way to distinguish the origin of the Event received by the map
so that I can perform a 3-way merge on the map
? If thats not possible my next idea will be to swap the PriorityQueue
for a TreeMap
and put everything into a window so that the merge happens at the end of the window once the 3 TreeMaps
have been received. Does this other option make sense in case option a is non viable or is there a better solution to do something like this?