- I want to batch messages based on the timestamp the message was created.
- Furthermore, I want to batch these messages in fixed time windows (of 1 minute).
- Only after the passing of the window, the batch should be pushed downstream.
For this to work, the Processor API seems more or less fitting (a la KStream batch process windows):
public void process(String key, SensorData sensorData) {
//epochTime is rounded to our prefered time window (1 minute)
long epochTime = Sensordata.epochTime;
ArrayList<SensorData> data = messageStore.get(epochTime);
if (data == null) {
data = new ArrayList<SensorData>();
}
data.add(sensorData);
messageStore.put(id, data);
this.context.commit();
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(60000); // equal to 1 minute
}
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
while (it.hasNext()) {
KeyValue<String, ArrayList<SensorData>> entry = it.next();
this.context.forward(entry.key, entry.value);
}
//reset messageStore
}
However, this approach has one major downside with : we don't use Kafka Streams windows.
- out-of-order messages are not considered.
- When operating in real-time, punctuation schedule should be equal to desired batch time window. If we set it to short, the batch will be forwarded and downstream computation will start to quickly. If set to long, and punctuation is triggered when a batch window is not finished yet, same problem.
- also, replaying historic data whilst keeping the punctuation schedule (1 minute) will trigger the first computation only after 1 minute. If so, that will blow up the statestore and also feels wrong.
Taking these points into consideration, I should use Kafka Streams windows. But this is only possible in the Kafka Streams DSL...
Any toughts on this would be awesome.