0

I have a stream of incoming primary keys (PK) that I am reading in my Kafkastreams app, I would like to batch them over say last 1 minute and query my transactional DB to get more data for the batch of PKs (deduplicated) in the last minute. And for each PK I would like to post a message on output topic.

I was able to code this using Processor API like below:

Topology topology = new Topology();
    topology.addSource("test-source", inputKeySerde.deserializer(), inputValueSerde.deserializer(), "input.kafka.topic")
        .addProcessor("test-processor", processorSupplier, "test-source")
        .addSink("test-sink", "output.kafka.topic", outputKeySerde.serializer(), outputValueSerde.serializer, "test-processor");

Here processor supplier has a process method that adds the PK to a queue and a punctuator that is scheduled to run every minute and drains the queue and queries transactional DB and forwards a message for every PK.

    ProcessorSupplier<Integer, ValueType> processorSupplier = new ProcessorSupplier<Integer, ValueType>() {
      public Processor<Integer, ValueType> get() {
        return new Processor<Integer, ValueType>() {
          private ProcessorContext context;
          private BlockingQueue<Integer> ids;

          public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(Duration.ofMillis(1000), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
            ids = new LinkedBlockingQueue<>();
          }

          @Override
          public void process(Integer key, ValueType value) {
            ids.add(key);
          }

          public void punctuate(long timestamp) {
            Set<Long> idSet = new HashSet<>();
            ids.drainTo(idSet, 1000);
            List<Document> documentList = createDocuments(ids);
            documentList.stream().forEach(document -> context.forward(document.getId(), document));
            context.commit();
          }

          @Override
          public void close() {
          }
        };
      }
    };

Wondering if there is a simpler way to accomplish this using DSL windowedBy and reduce/aggregate route?

***** Updated code to use state store ******

    ProcessorSupplier<Integer, ValueType> processorSupplier = new ProcessorSupplier<Integer, ValueType>() {
      public Processor<Integer, ValueType> get() {
        return new Processor<Integer, ValueType>() {
          private ProcessorContext context;
          private KeyValueStore<Integer, Integer> stateStore;

          public void init(ProcessorContext context) {
            this.context = context;
            stateStore = (KeyValueStore) context.getStateStore("MyStore");
            this.context.schedule(Duration.ofMillis(5000), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
              Set<Integer> ids = new HashSet<>();
              try (KeyValueIterator<Integer, Integer> iter = this.stateStore.all()) {
                while (iter.hasNext()) {
                  KeyValue<Integer, Integer> entry = iter.next();
                  ids.add(entry.key);
                }
              }
              List<Document> documentList = createDocuments(dataRetriever, ids);
              documentList.stream().forEach(document -> context.forward(document.getId(), document));
              ids.stream().forEach(id -> stateStore.delete(id));
              this.context.commit();
            });
          }

          @Override
          public void process(Integer key, ValueType value) {
            Long id = key.getId();
            stateStore.put(id, id);
          }

          @Override
          public void close() {
          }
        };
      }
    };
prixa
  • 80
  • 7
  • Using the Processor API seems to be the better choice. – Matthias J. Sax Jan 10 '20 at 03:47
  • Understand. Is this possible to solve using DSL? If yes, how? Also why is processor API a better choice? – prixa Jan 10 '20 at 22:52
  • The DSL is not flexible enough and does not support punctuations. – Matthias J. Sax Jan 11 '20 at 04:25
  • Thanks @MatthiasJ.Sax ! I updated the question to include the processor code. Does this guarantee that if punctuate() throws an error, the input topic offset is not advanced? If not, how can I ensure that? – prixa Jan 14 '20 at 01:15
  • Yes, if `punctuate()` throws an exception, offsets would not be committed, and the `StreamThread` would die. – Matthias J. Sax Jan 14 '20 at 03:46
  • Would you be able to point me to a good document that explains when offsets are committed and best practices for exception handling for Kafka streams processor API? I did not find much here : https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html – prixa Jan 14 '20 at 19:29
  • There is not much documentation. Offset are committed based on `commit.interval.ms` configuration. If any exception is thrown, Kafka Streams stops processing, no offsets are committed, and Kafka Streams dies. – Matthias J. Sax Jan 14 '20 at 21:40
  • Thanks, so the right exception handling would be 1. Catch all expected exceptions, log them and continue/abort as necessary 2. Make sure you have enough streams threads/nodes running that if a couple die due to an unexpected error, others can take over? – prixa Jan 14 '20 at 21:59
  • In the above processor code, what happens when context.commit() is called? I am using a BlockingQueue so I can drain ids and not reprocess processed ids. Would context.commit() store the current values in ids queue somewhere, or should I use an in-memory fault-tolerant KeyValueStore for this? – prixa Jan 14 '20 at 22:06
  • About exception handling: correct. There are some more questions on SO about it -- might be worth to browse around. About context.commit() -> https://stackoverflow.com/questions/43416178/how-to-commit-manually-with-kafka-stream -- If you use an BlockingQueue it's not fault-tolerant -- you would need to use an build-in store like in-memory KeyValueStore to get fault-tolerance from Kafka Streams. – Matthias J. Sax Jan 15 '20 at 05:15
  • Thanks @MatthiasJ.Sax. Regarding using KeyValueStore, whats the best way to drain the values from a state store? I want to read all current key-value pairs in the state store, query my transactional DB for the batch, and then delete all processed key-value pairs from state store. – prixa Jan 31 '20 at 06:38
  • You would need to call `delete(key)` for each one individually – Matthias J. Sax Jan 31 '20 at 18:36
  • Okay, what happens if after I delete key from state store, my transactional DB query throws an error? Is there a possibility of missing processing the keys that were already drained from state store (into an in memory set perhaps)? Updated code above to use state store. – prixa Feb 01 '20 at 01:00
  • Yes that could happen. In general, it's hard to integrate an external system and to cover all corner cases. – Matthias J. Sax Feb 01 '20 at 03:25
  • I see, was hoping I can maintain at least once delivery guarantee here. I would imagine in the scenario I mention above, when transactional DB query throws an error, the offset on the input kafka topic will not be updated and even though state store entries are deleted, the app would read those records again from input kafka topic. Please correct me if my understanding is incorrect. – prixa Feb 02 '20 at 02:54
  • 1
    You can get at least once semantics by first letting the external DB transaction commit, and afterward delete data from the local Kafka Streams store. -- For the case you mention in your comment, you are right: if the DB transaction fails and you re-throw an exception and let Kafka Streams dies, offsets would not be recorded. – Matthias J. Sax Feb 02 '20 at 08:42
  • Sounds good. Updated the code above to delete state store entries after external DB transaction is committed. I think I read on one of the SO links that punctuate and transform (using TransformerSupplier now) are executed on the same thread, so would it be safe to assume that stateStore will not have new records until punctuate has finished executing (that could be potentially deleted if this were not the case)? – prixa Feb 03 '20 at 11:33
  • That is correct -- regular processing and punctuations are executed on the same thread and thus you don't need to worry about thread safety/concurrency/race conditions. – Matthias J. Sax Feb 04 '20 at 06:17
  • Thanks @MatthiasJ.Sax for all the help. I will update the final code I use in the question above, once I test it. If you would like, please write an answer to original question I had, so I can accept that as the solution. – prixa Feb 06 '20 at 11:09
  • 1
    Feel free to post your solution as answer an accepted your own answer :) – Matthias J. Sax Feb 07 '20 at 18:11

0 Answers0