I have a stream of incoming primary keys (PK) that I am reading in my Kafkastreams app, I would like to batch them over say last 1 minute and query my transactional DB to get more data for the batch of PKs (deduplicated) in the last minute. And for each PK I would like to post a message on output topic.
I was able to code this using Processor API like below:
Topology topology = new Topology();
topology.addSource("test-source", inputKeySerde.deserializer(), inputValueSerde.deserializer(), "input.kafka.topic")
.addProcessor("test-processor", processorSupplier, "test-source")
.addSink("test-sink", "output.kafka.topic", outputKeySerde.serializer(), outputValueSerde.serializer, "test-processor");
Here processor supplier has a process method that adds the PK to a queue and a punctuator that is scheduled to run every minute and drains the queue and queries transactional DB and forwards a message for every PK.
ProcessorSupplier<Integer, ValueType> processorSupplier = new ProcessorSupplier<Integer, ValueType>() {
public Processor<Integer, ValueType> get() {
return new Processor<Integer, ValueType>() {
private ProcessorContext context;
private BlockingQueue<Integer> ids;
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(Duration.ofMillis(1000), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
ids = new LinkedBlockingQueue<>();
}
@Override
public void process(Integer key, ValueType value) {
ids.add(key);
}
public void punctuate(long timestamp) {
Set<Long> idSet = new HashSet<>();
ids.drainTo(idSet, 1000);
List<Document> documentList = createDocuments(ids);
documentList.stream().forEach(document -> context.forward(document.getId(), document));
context.commit();
}
@Override
public void close() {
}
};
}
};
Wondering if there is a simpler way to accomplish this using DSL windowedBy and reduce/aggregate route?
***** Updated code to use state store ******
ProcessorSupplier<Integer, ValueType> processorSupplier = new ProcessorSupplier<Integer, ValueType>() {
public Processor<Integer, ValueType> get() {
return new Processor<Integer, ValueType>() {
private ProcessorContext context;
private KeyValueStore<Integer, Integer> stateStore;
public void init(ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore) context.getStateStore("MyStore");
this.context.schedule(Duration.ofMillis(5000), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
Set<Integer> ids = new HashSet<>();
try (KeyValueIterator<Integer, Integer> iter = this.stateStore.all()) {
while (iter.hasNext()) {
KeyValue<Integer, Integer> entry = iter.next();
ids.add(entry.key);
}
}
List<Document> documentList = createDocuments(dataRetriever, ids);
documentList.stream().forEach(document -> context.forward(document.getId(), document));
ids.stream().forEach(id -> stateStore.delete(id));
this.context.commit();
});
}
@Override
public void process(Integer key, ValueType value) {
Long id = key.getId();
stateStore.put(id, id);
}
@Override
public void close() {
}
};
}
};