24

Is there a way to commit manually with Kafka Stream?

Usually with using the KafkaConsumer, I do something like below:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}

Where I'm calling commit manually. I don't see a similar API for KStream.

Glide
  • 20,235
  • 26
  • 86
  • 135

1 Answers1

40

Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit -- in fact, auto-commit is disabled for the internally used consumer and Streams manages commits "manually". The reason is, that commits can only happen at certain points during processing to ensure no data can get lost (there a many internal dependencies with regard to updating state and flushing results).

For more frequent commits, you can reduce commit interval via StreamsConfig parameter commit.interval.ms.

Nevertheless, manual commits are possible indirectly, via low-level Processor API. You can use the context object that is provided via init() method to call context#commit(). Note, that this is only a "request to Streams" to commit as soon as possible -- it's not issuing a commit directly.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • What happens if there is an exception and the stream app crashes. Then again starting the stream app will lead to consume same message and loop will go on until we delete the topic and re-create it. – Arpan Sharma Oct 12 '19 at 13:35
  • Yes, if your application crashes is is restarted, it will resume processing from the latest committed offset (similar to `KafkaConsumer`, that is in fact used internally). -- Not sure what you mean by "until we delete the topic and re-create it"? How does committing offsets relates to deleting/re-creating topics? – Matthias J. Sax Oct 12 '19 at 19:23
  • The issue was I was reading a specific message and it had special characters in it and consuming this message was leading my stream app to crash. Again when the app started it consumed the same message and crashed and loop continued. My point is can we not manually commit the message while catching an exception and proceed with next message – Arpan Sharma Oct 14 '19 at 04:26
  • 4
    Not within the application. Depending _when_ your application encounters the issue, you can maybe use a `DeserializationExceptionHandler`: https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-deserialization-exception-handler -- Or you might be able to catch the exception and "swallow" it. – Matthias J. Sax Oct 14 '19 at 07:12
  • Thanks got it now!! Much appreciated – Arpan Sharma Oct 14 '19 at 07:46
  • Any application exception thrown within kafka streams application may cause duplicate messages (offsets and message commits going out of sync) if the exception is not deserlization or production exception. I have tested this with kafka 2.5 even with processing gurantee (EOS setup). Issuing context.commt() didnot help – Balan Jul 07 '20 at 02:14
  • @Balan With EOS, you should never get any duplicates in your output. When you verified your output, did use configure the consumer with `read_committed` mode? – Matthias J. Sax Jul 07 '20 at 05:10
  • @MatthiasJ.Sax Agreed if we do read-committed, it would filter out uncommitted transactions. That said some of the delivered interceptors and even the replicator does not support setting isolation level for the consumer. so this is still an issue when you look at the overall eco system – Balan Jul 08 '20 at 21:24
  • Well, if some component do not support the correct isolation level, there is not much you can do about it. EOS only work as advertised if all components support it. What replicator are you using? Wondering why it does not support isolation level "read-committed" -- and note that interceptors that trigger side-effects are not covered by the EOS guarantee by design. – Matthias J. Sax Jul 08 '20 at 21:31
  • @MatthiasJ.Sax I verified with confluent replicator 5.5 which did not have the option to override isolation level for the consumer (or rather it did not use the provided isolation level). I will retest this with MM2 – Balan Sep 04 '20 at 13:20
  • Confluent replicator should allow you to set the consumer configs: https://docs.confluent.io/current/multi-dc-deployments/replicator/replicator-run.html#origin-cluster-configuration – Matthias J. Sax Sep 04 '20 at 16:21
  • @MatthiasJ.Sax how to manual commit message in uncaughtExceptionHandler if... message caused error during aggregation process? It can not be validated with DeserializationExceptionHandler. I have now infinite loop of restarting stream app, but if I can somehow omit/skip this corrupted message...? – Ernesto Jul 13 '22 at 13:56
  • You can't. -- if you want to skip such a corrupted message, you need to insert a `transform()` that uses `forward()` -- you can wrap `forward()` with a try-catch to swallow the exception and effectively drop the bad record. – Matthias J. Sax Jul 13 '22 at 15:58