Similarly but slightly different as this question: KStream batch process windows, I want to batch messages from a KStream
before pushing it down to consumers.
However, this push-down should not be scheduled on a fixed time-window, but on a fixed message count threshold per key.
For starters 2 questions come to mind:
1) Is a custom AbstractProcessor
the way this should be handled? Something along the lines of:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2) Since the StateStore
will potentially explode (in case an entry value never reaches the threshold in order to be forwarded), what is the best way to 'garbage-collect' this? I could do a timebased schedule and remove keys that are too old... but that looks very DIY and error prone.