We have multiple kafka clusters dedicated to our data pipeline. One of our production clusters was setup a year back. The data path is something like
source topic -> kafkastreams -> destination topics -> kafkaconnect
We started getting problems where the Kafkaconnect
server fails to initialize or kafkastream
fails to process data, as consumers are always in a rebalancing state after a few minutes. It starts with a timeout error that surfaces as an illegalStateException.
It appears that the issue of the TimeoutException
being thrown as illegalStateException
has been addressed in v3.4.1 and 3.5(line# 1189).
Exception in thread "event-app-t1-72620c44-7799-437c-8efb-8a72c31767af-StreamThread-4" org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:626)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:472)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:969)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:909)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:735)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
... 1 more
Caused by: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition event-app-t1-KSTREAM-AGGREGATE-STATE-STORE-0000000024-repartition-3 could be determined
at org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1187)
at org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:532)
at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:95)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:443)
... 11 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition event-app-t1-KSTREAM-AGGREGATE-STATE-STORE-0000000024-repartition-3 could be determined
We have tried the following so far:
- changed the following parameters
max.poll.records = 500
max.poll.interval.ms = 600000
session.timeout.ms = 100000
api.timeout.ms = 300000
Moved the temporary storage to a persistent volume so that subsequent restart of
kafkastreams
will rebuild therockdb
store incrementally. This didn't help.Switched
kafkaconnect
andkafkastreams
to v3.4.1 to leverage the above fix so that thetimeoutException
is swallowed. This didn't help either, as the state still changes from Running to suspended-> Shutting down after an exception.
The only change that helped move past some timeouts so far is deleting the topics where the timeout occurs and recreating them. This makes us believe that something on the topic is causing the issue as a side effect of how old the topic is and how much data passed through the topic. Deleting a topic once in 18 months in a production environment is not feasible. We believe the root cause has not been identified yet.
The current set of timeouts looks something like
event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1-producer] INFO org.apache.kafka.clients.Metadata - [Producer clientId=event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1-producer] Resetting the last seen epoch of partition event-app-t1-KSTREAM-AGGREGATE-STATE-STORE-0000000090-changelog-1 to 0 since the associated topicId changed from null to AlNoTK-HQE-_qTVdA2Dvcg
[event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] Error flushing caches of dirty task 1_1
java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before the position for partition some_messages-1 could be determined
at org.apache.kafka.streams.processor.internals.StreamTask.findOffset(StreamTask.java:427)
at org.apache.kafka.streams.processor.internals.StreamTask.committableOffsetsAndMetadata(StreamTask.java:456)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
at org.apache.kafka.streams.processor.internals.TaskManager.closeTaskDirty(TaskManager.java:1244)
at org.apache.kafka.streams.processor.internals.TaskManager.closeAndCleanUpTasks(TaskManager.java:1391)
at org.apache.kafka.streams.processor.internals.TaskManager.lambda$shutdown$3(TaskManager.java:1289)
at org.apache.kafka.streams.processor.internals.TaskManager.executeAndMaybeSwallow(TaskManager.java:1812)
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:1287)
at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1173)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:582)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before the position for partition some_messages-1 could be determined
[event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] task [1_1] Suspended from RUNNING
Kafka cluster: 3 node, KRaft mode, v3.3.1
Kafkastream: v3.4.1, single instance, 3 stream thread
Kafkaconnect: v3.4.1, distributed mode, single instance, multiple connectors (elasticsearch, BigQuery, Google pub/sub, http)
Kafka cluster is created with:
KAFKA_ENABLE_KRAFT: 'yes'
KAFKA_KRAFT_CLUSTER_ID: 'xxxxxxxxxxxxxxxx'
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENERS: CONTROLLER://:9093,INSIDE://:9092,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INSIDE:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@dpkafka01:9093,2@dpkafka02:9093,3@dpkafka03:9093
KAFKA_CFG_ADVERTISED_LISTENERS: INSIDE://:9092,EXTERNAL://_{HOSTIP}:9097
KAFKA_BROKER_ID: 1
KAFKA_CFG_NODE_ID: 1
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_HEAP_OPTS: "-Xmx1G -Xms256m"
KAFKA_LOG_DIRS: /bitnami/kafka/kafka-logs
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_LOG_RETENTION_MS: 7200000
KAFKA_LOG_SEGMENT_MS: 86400000
KAFKA_LOG_DELETE_RETENTION_MS: 7200000
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
KAFKA_LOG_CLEANUP_POLICY: "compact,delete"
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 12000
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR: 4
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2
ALLOW_PLAINTEXT_LISTENER: 'yes'
We appreciate some help in debugging this.
(Note: Network and related areas are verified. The errors start appearing after a few minutes, after processing data for some time, and kafkastreams
switch to suspended from Running. We also noticed the same timeout error from a client that connects to the Kafka cluster. Screenshot below. This happens after reading the data from the topic for a few minutes. I'm reiterating the fact that this setup was working with no issues for the last 18 months. The timeout and related issues started just a week back)