3
  • I want to batch messages based on the timestamp the message was created.
  • Furthermore, I want to batch these messages in fixed time windows (of 1 minute).
  • Only after the passing of the window, the batch should be pushed downstream.

For this to work, the Processor API seems more or less fitting (a la KStream batch process windows):

public void process(String key, SensorData sensorData) {
    //epochTime is rounded to our prefered time window (1 minute)
    long epochTime = Sensordata.epochTime;

    ArrayList<SensorData> data = messageStore.get(epochTime);
    if (data == null) {
        data = new ArrayList<SensorData>();
    }

    data.add(sensorData);
    messageStore.put(id, data);
    this.context.commit();
}

@Override
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(60000); // equal to 1 minute
}

@Override
public void punctuate(long streamTime) {
    KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
    while (it.hasNext()) {
        KeyValue<String, ArrayList<SensorData>> entry = it.next();
        this.context.forward(entry.key, entry.value);
    }
    //reset messageStore
}

However, this approach has one major downside with : we don't use Kafka Streams windows.

  • out-of-order messages are not considered.
  • When operating in real-time, punctuation schedule should be equal to desired batch time window. If we set it to short, the batch will be forwarded and downstream computation will start to quickly. If set to long, and punctuation is triggered when a batch window is not finished yet, same problem.
  • also, replaying historic data whilst keeping the punctuation schedule (1 minute) will trigger the first computation only after 1 minute. If so, that will blow up the statestore and also feels wrong.

Taking these points into consideration, I should use Kafka Streams windows. But this is only possible in the Kafka Streams DSL...

Any toughts on this would be awesome.

Community
  • 1
  • 1
Raf
  • 842
  • 1
  • 9
  • 25

1 Answers1

2

You can mix-and-match DSL and Processor API using process(), transform(), or transformValues() within DSL (there are some other SO question about this already so I do not elaborate further). Thus, you can use regular window construct in combination with a custom (downstream) operator to hold the result back (and deduplicate). Some duduplication will already happen automatically within you window-operator (as of Kafka 0.10.1; see http://docs.confluent.io/current/streams/developer-guide.html#memory-management) but if you want to have exactly one result the cache will not do it for you.

About punctuate: it is triggered based on progress (ie, stream-time) and not based on wall-clock time -- so if you reprocess old data, if will be called the exact same amount of times as in you original run (just faster after each other if you consider wall-clock time as you process older data faster). There are also some SO question about this if you want to get more details.

However, I general consideration: why do you need exactly one result? If you do stream processing, you might want to build you downstream consumer application to be able to handle updates to your result. This is the inherent design of Kafka: using changelogs.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • I'm working on that: using DSL with windows and let the underlying compute consumer back out when its batch is not full yet – Raf Dec 16 '16 at 07:45