4

I noticed that my Kafka Streams application will enter the ERROR state after being unable to communicate with Kafka for some period of time. I want to find a way to make Kafka Streams essentially "retry forever" instead of entering the ERROR state. The only work-around is to restart my Kafka Streams application, which is not ideal.

I set request.timeout.ms=2147483647 in my Kafka Streams configurations. I noticed that this helps (it used to go into the ERROR state after about a minute, now it happens less frequently, but it still does eventually).

This is my Kafka Streams configuration:

 commit.interval.ms: 10000
 cache.max.bytes.buffering: 0
 retries: 2147483647
 request.timeout.ms: 2147483647
 retry.backoff.ms: 5000
 num.stream.threads: 1
 state.dir: /tmp/kafka-streams
 producer.batch.size: 102400
 producer.max.request.size: 31457280
 producer.buffer.memory: 314572800
 producer.max.in.flight.requests.per.connection: 10
 producer.linger.ms: 0
 consumer.max.partition.fetch.bytes: 31457280
 consumer.receive.buffer.bytes: 655360

This is an the relevant part of the log from Kafka Streams:

[2019-06-07T22:18:07,223Z {StreamThread-1} WARN  org.apache.kafka.clients.NetworkClient] [Consumer clientId=StreamThread-1-consumer, groupId=app-stream] 20 partitions have leader brokers without a matching listener, including [app-stream-tmp-store-changelog-5, app-stream-tmp-store-changelog-13, app-stream-tmp-store-changelog-9, app-stream-tmp-store-changelog-1, __consumer_offsets-10, __consumer_offsets-30, __consumer_offsets-18, __consumer_offsets-22, __consumer_offsets-34, __consumer_offsets-6]
[2019-06-07T22:18:08,662Z {StreamThread-1} ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks] stream-thread [StreamThread-1] Failed to commit stream task 0_14 due to the following error:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {global-14=OffsetAndMetadata{offset=33038702, leaderEpoch=null, metadata=''}}
[2019-06-07T22:18:08,662Z {StreamThread-1} ERROR org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {global-2=OffsetAndMetadata{offset=25537237, leaderEpoch=null, metadata=''}}
[2019-06-07T22:18:08,662Z {StreamThread-1} INFO  org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[2019-06-07T22:18:08,662Z {StreamThread-1} INFO  org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] Shutting down
[2019-06-07T22:18:08,704Z {StreamThread-1} INFO  org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
[2019-06-07T22:18:08,704Z {StreamThread-1} INFO  org.apache.kafka.clients.producer.KafkaProducer] [Producer clientId=StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2019-06-07T22:18:08,728Z {StreamThread-1} INFO  org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[2019-06-07T22:18:08,728Z {StreamThread-1} INFO  org.apache.kafka.streams.KafkaStreams] stream-client [usxapgutpd01-] State transition from RUNNING to ERROR
[2019-06-07T22:18:08,728Z {StreamThread-1} ERROR org.apache.kafka.streams.KafkaStreams] stream-client [usxapgutpd01-] All stream threads have died. The instance will be in error state and should be closed.
[2019-06-07T22:18:08,728Z {StreamThread-1} INFO  org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] Shutdown complete
miguno
  • 14,498
  • 3
  • 47
  • 63
  • please take a look at https://stackoverflow.com/questions/51299528/handling-exceptions-in-kafka-streams/51299739#51299739. if any exception occurred either during producing a message, or processing/transforming message, streams move into `ERROR` state, and stop processing. in order to handle that, you need to implement `ProductionExceptionHandler ` – Vasyl Sarzhynskyi Jun 12 '19 at 15:44
  • 1
    I don't think that the other question helps here. You might want to increase `default.api.timeout.ms` ? – Matthias J. Sax Jun 12 '19 at 17:41
  • @VasiliySarzhynskyi if I use the ProductionExceptionHandler and return ProductionExceptionHandlerResponse.CONTINUE (as in the example), if I'm interpreting this correctly, wouldn't that just skip the record, resulting in data loss? I want a way to never skip a record, but merely keep retrying the Kafka connection. – Nathan Linebarger Jun 12 '19 at 17:55
  • 1
    @MatthiasJ.Sax default.api.timeout.ms looks promising (as a consumer config for Kafka Streams, I guess that would be consumer.default.api.timeout.ms). I will try it. – Nathan Linebarger Jun 12 '19 at 17:57
  • with `ProductionExceptionHandlerResponse.CONTINUE` record will be skipped and will not be reprocessed, but you have a possibility to handle it in an appropriate way like putting a message into another topic for later producing. or you could try to increase timeout – Vasyl Sarzhynskyi Jun 12 '19 at 17:59
  • The goal for the question is, to avoid that the handler is called in the first place. – Matthias J. Sax Jun 12 '19 at 23:54
  • 2
    @NathanLinebarger Did you overcome this issue, did the default.api.timeout helped? – Debojit Paul Feb 24 '20 at 12:27

0 Answers0