1

Similarly but slightly different as this question: KStream batch process windows, I want to batch messages from a KStream before pushing it down to consumers.

However, this push-down should not be scheduled on a fixed time-window, but on a fixed message count threshold per key.

For starters 2 questions come to mind:

1) Is a custom AbstractProcessor the way this should be handled? Something along the lines of:

@Override
public void punctuate(long streamTime) {
    KeyValueIterator<String, Message[]> it = messageStore.all();
    while (it.hasNext()) 
        KeyValue<String, Message[]> entry = it.next();
        if (entry.value.length > 10) {
            this.context.forward(entry.key, entry.value);
            entry.value = new Message[10]();
        }
    }
}

2) Since the StateStore will potentially explode (in case an entry value never reaches the threshold in order to be forwarded), what is the best way to 'garbage-collect' this? I could do a timebased schedule and remove keys that are too old... but that looks very DIY and error prone.

Community
  • 1
  • 1
Raf
  • 842
  • 1
  • 9
  • 25

1 Answers1

2

I guess this would work. Applying a time based 'garbage collection' sounds reasonable, too. And yes, using Processor API instead of DSL has some flavor of DIY -- that't the purpose of PAPI in the first place (empower the user to do whatever is needed).

A few comments though:

  • You will need a more complex data structure: because punctuate() is called based on stream-time progress, it can happen that you have more than 10 records for one key between two calls. Thus, you would need something like KeyValueIterator<String, List<Message[]>> it = messageStore.all(); to be able to store multiple batches per key.
  • I would assume that you will need to fine tune the schedule for punctuate which will be tricky -- if your schedule is too tight, many batches might not be completed yet and you waste CPU -- if your schedule is too loose, you will need a lot of memory and your downstream operators will get a lot of data as you emit a lot of stuff at once. Sending burst of data downstream could become a problem.
  • Scanning the whole store is expensive -- it seems to be a good idea to try to "sort" your key-value pairs according to their batch size. This should enable you to only touch keys which do have completed batches instead of all keys. Maybe you can keep an in-memory list of keys that have complteted batches and only do a lookup for those (on failure, you need to do a single pass over all keys from the store to recreate this in-memory list).
Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks for these valuable comments. More specific questions will follow with the implementation, no doubt. – Raf Dec 03 '16 at 09:29