I noticed that my Kafka Streams application will enter the ERROR
state after being unable to communicate with Kafka for some period of time. I want to find a way to make Kafka Streams essentially "retry forever" instead of entering the ERROR
state. The only work-around is to restart my Kafka Streams application, which is not ideal.
I set request.timeout.ms=2147483647
in my Kafka Streams configurations. I noticed that this helps (it used to go into the ERROR
state after about a minute, now it happens less frequently, but it still does eventually).
This is my Kafka Streams configuration:
commit.interval.ms: 10000
cache.max.bytes.buffering: 0
retries: 2147483647
request.timeout.ms: 2147483647
retry.backoff.ms: 5000
num.stream.threads: 1
state.dir: /tmp/kafka-streams
producer.batch.size: 102400
producer.max.request.size: 31457280
producer.buffer.memory: 314572800
producer.max.in.flight.requests.per.connection: 10
producer.linger.ms: 0
consumer.max.partition.fetch.bytes: 31457280
consumer.receive.buffer.bytes: 655360
This is an the relevant part of the log from Kafka Streams:
[2019-06-07T22:18:07,223Z {StreamThread-1} WARN org.apache.kafka.clients.NetworkClient] [Consumer clientId=StreamThread-1-consumer, groupId=app-stream] 20 partitions have leader brokers without a matching listener, including [app-stream-tmp-store-changelog-5, app-stream-tmp-store-changelog-13, app-stream-tmp-store-changelog-9, app-stream-tmp-store-changelog-1, __consumer_offsets-10, __consumer_offsets-30, __consumer_offsets-18, __consumer_offsets-22, __consumer_offsets-34, __consumer_offsets-6]
[2019-06-07T22:18:08,662Z {StreamThread-1} ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks] stream-thread [StreamThread-1] Failed to commit stream task 0_14 due to the following error:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {global-14=OffsetAndMetadata{offset=33038702, leaderEpoch=null, metadata=''}}
[2019-06-07T22:18:08,662Z {StreamThread-1} ERROR org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {global-2=OffsetAndMetadata{offset=25537237, leaderEpoch=null, metadata=''}}
[2019-06-07T22:18:08,662Z {StreamThread-1} INFO org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[2019-06-07T22:18:08,662Z {StreamThread-1} INFO org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] Shutting down
[2019-06-07T22:18:08,704Z {StreamThread-1} INFO org.apache.kafka.clients.consumer.KafkaConsumer] [Consumer clientId=StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
[2019-06-07T22:18:08,704Z {StreamThread-1} INFO org.apache.kafka.clients.producer.KafkaProducer] [Producer clientId=StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2019-06-07T22:18:08,728Z {StreamThread-1} INFO org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[2019-06-07T22:18:08,728Z {StreamThread-1} INFO org.apache.kafka.streams.KafkaStreams] stream-client [usxapgutpd01-] State transition from RUNNING to ERROR
[2019-06-07T22:18:08,728Z {StreamThread-1} ERROR org.apache.kafka.streams.KafkaStreams] stream-client [usxapgutpd01-] All stream threads have died. The instance will be in error state and should be closed.
[2019-06-07T22:18:08,728Z {StreamThread-1} INFO org.apache.kafka.streams.processor.internals.StreamThread] stream-thread [StreamThread-1] Shutdown complete