10

For Kafka Streams, if we use lower-level processor API, we can control to commit or not. So if problems happens in our code, and we don't want to commit this message. In this case, Kafka will redeliver this message multiple times until the problem gets fixed.

But how to control whether commit the message when use its higher-level stream DSL API?

Resources:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
jeffery.yuan
  • 1,177
  • 1
  • 17
  • 27

1 Answers1

17

Your statement is not completely true. You cannot "control to commit or not" -- at least not directly (neither in Processor API nor in DSL). You can only use ProcessorContext#commit() to request additional commits. Thus, after a call to #commit() Streams tries to commit as soon as possible, but it's not an immediate commit. Furthermore, Streams will commit automatically even if you never call #commit(). You can control Streams commit interval via Streams configuration commit.interval.m (cf. http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

In case of a "problem", it depends on the type of problem you have how to respond to it:

  • if you detect a problem you cannot recover from, you can only throw an exception and "stop the world" (cf. below).
  • if you have a recoverable error, you need to "loop" within your own code (e.g., within Processor#process() or KeyValueMapper#apply() until the problem got resolved and you can successfully process the current message (note, that you might run into a timeout, ie, exception, using this strategy -- cf. consumer configs heartbeat.interval.ms and for 0.10.1 session.timeout.ms [KIP-62])
  • an alternative would be, to put records that cannot be processed right now into an StateStore and process them later on. However, it's hard to get right and also breaks a few Streams assumptions (eg, processing order). It's not recommended to use, and if used, you must be very carefully about the implications

If there is an uncaught exception, StreamThread will die and no commit happens (you can register an exception handler to get notified about this: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. If all you StreamThread died, you will need to create a new instance of KafkaStreams to restart you application.

You must not return from user code before a message got successfully processed, because if you return, Streams assumes that the message got successfully processed (and thus might commit the corresponding offset). With regard to bullet point (3), putting a record into a special StateStore for later processing is considered a "successfully" processed record.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks so much, Matthias. You comments made me understand more about the internals of Kafka. And for recoverable error(for example, when write the data to destination server which is down at this time), we choose to retry forever until it gets fixed. This would block the consumer but it's fine in our case. – jeffery.yuan Feb 07 '17 at 08:30
  • Could you provide a link to some documentation on how KafkaStreams handles offset commits. It sounds like you are saying that the offset will not be commit until after the user code finishes and will not commit if the user code throws an error. Is that correct? – Rylander Sep 11 '17 at 22:09
  • What you are saying is correct. Unfortunately, there is no documentation available (only the code... if you want to dig into is, check out `StreamsThread#maybeCommit()` method). – Matthias J. Sax Sep 12 '17 at 16:59
  • @MatthiasJ.Sax It's a while since this question was answered, but isn't it also an option to turn off auto commits? (in the case of Processor API at least) – burk Oct 03 '18 at 14:57
  • 2
    You could set the `commit.internal.ms` config to `Long.MAX_VALUE` -- this will effectively avoid that Kafka Streams commits automatically but only after you called `context#commit()`. – Matthias J. Sax Oct 03 '18 at 16:46
  • @MatthiasJ.Sax or just set `enable.auto.commit` to `false` in the Processor API? – burk Oct 04 '18 at 06:54
  • Oh, that's a property on `ConsumerConfig`, so I guess that might be ignored by the Processor API? – burk Oct 04 '18 at 06:57
  • 2
    KS set's `enable.auto.commit` to `false` by default anyway. However, KS has it's own commit logic (ie, it "manually" calls Consumer#commit() internally each `commit.interval.ms`): Thus, from a consumer point if view, we use manual committing, however, from a Kafka Streams point of view, it's still automatically, and there is no Kafka Streams config to disable it. – Matthias J. Sax Oct 04 '18 at 14:24
  • @MatthiasJ.Sax if we set StreamsConfig.COMMIT_INTERVAL_MS_CONFIG config to Long.MAX_VALUE then it will commit only after context().commit correct? What about those messages in topic which stuck due malformed format how to commit it or clear from topic – Abdul Gaffar Khan Aug 12 '20 at 13:38
  • In Kafka, you don't have to commit each message individually, but committing offset with offset X implies that all messages with offset smaller than X are committed as well. Thus, if a message with offset Y is corrupted, and you commit the next message at offset Y+1, the corrupted message is committed, too. – Matthias J. Sax Aug 15 '20 at 04:46
  • Right now we are setting COMMIT_INTERVAL_MS_CONFIG 1000 but in our application, there are scenarios where an exception occurs when we give call to another service or pod may get restarted chances of message loss is there, so to handle MSG loss thinking turning off autocommit and use context.commit at the end of complete process. – Abdul Gaffar Khan Aug 15 '20 at 15:04
  • Not sure why you would loose a message for this case? If you cannot call the external service, and your KS app dies, it won't commit any offsets. And if you don't want to let it die and retry the message later, state stores (and maybe punctuations) are your friend. -- Committing manually with high frequency implies high overhead and is not recommended. – Matthias J. Sax Aug 16 '20 at 19:06