1

We have multiple kafka clusters dedicated to our data pipeline. One of our production clusters was setup a year back. The data path is something like

source topic -> kafkastreams -> destination topics -> kafkaconnect

We started getting problems where the Kafkaconnect server fails to initialize or kafkastream fails to process data, as consumers are always in a rebalancing state after a few minutes. It starts with a timeout error that surfaces as an illegalStateException. It appears that the issue of the TimeoutException being thrown as illegalStateException has been addressed in v3.4.1 and 3.5(line# 1189).

 Exception in thread "event-app-t1-72620c44-7799-437c-8efb-8a72c31767af-StreamThread-4" org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:626)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
 Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:472)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:969)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:909)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:735)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
    ... 1 more
 Caused by: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition event-app-t1-KSTREAM-AGGREGATE-STATE-STORE-0000000024-repartition-3 could be determined
    at org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1187)
    at org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:532)
    at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:95)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:443)
    ... 11 more
 Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition event-app-t1-KSTREAM-AGGREGATE-STATE-STORE-0000000024-repartition-3 could be determined

We have tried the following so far:

  1. changed the following parameters
  max.poll.records = 500
  max.poll.interval.ms = 600000
  session.timeout.ms = 100000
  api.timeout.ms = 300000
  1. Moved the temporary storage to a persistent volume so that subsequent restart of kafkastreams will rebuild the rockdb store incrementally. This didn't help.

  2. Switched kafkaconnect and kafkastreams to v3.4.1 to leverage the above fix so that the timeoutException is swallowed. This didn't help either, as the state still changes from Running to suspended-> Shutting down after an exception.

The only change that helped move past some timeouts so far is deleting the topics where the timeout occurs and recreating them. This makes us believe that something on the topic is causing the issue as a side effect of how old the topic is and how much data passed through the topic. Deleting a topic once in 18 months in a production environment is not feasible. We believe the root cause has not been identified yet.

The current set of timeouts looks something like

event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1-producer] INFO org.apache.kafka.clients.Metadata - [Producer clientId=event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1-producer] Resetting the last seen epoch of partition event-app-t1-KSTREAM-AGGREGATE-STATE-STORE-0000000090-changelog-1 to 0 since the associated topicId changed from null to AlNoTK-HQE-_qTVdA2Dvcg
[event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] Error flushing caches of dirty task 1_1
java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before the position for partition some_messages-1 could be determined
    at org.apache.kafka.streams.processor.internals.StreamTask.findOffset(StreamTask.java:427)
    at org.apache.kafka.streams.processor.internals.StreamTask.committableOffsetsAndMetadata(StreamTask.java:456)
    at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
    at org.apache.kafka.streams.processor.internals.TaskManager.closeTaskDirty(TaskManager.java:1244)
    at org.apache.kafka.streams.processor.internals.TaskManager.closeAndCleanUpTasks(TaskManager.java:1391)
    at org.apache.kafka.streams.processor.internals.TaskManager.lambda$shutdown$3(TaskManager.java:1289)
    at org.apache.kafka.streams.processor.internals.TaskManager.executeAndMaybeSwallow(TaskManager.java:1812)
    at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:1287)
    at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1173)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:582)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before the position for partition some_messages-1 could be determined
 [event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [event-app-t1-5a919f07-826b-4527-8329-047d96fce7f6-StreamThread-1] task [1_1] Suspended from RUNNING
  • Kafka cluster: 3 node, KRaft mode, v3.3.1
  • Kafkastream: v3.4.1, single instance, 3 stream thread
  • Kafkaconnect: v3.4.1, distributed mode, single instance, multiple connectors (elasticsearch, BigQuery, Google pub/sub, http)

Kafka cluster is created with:

      KAFKA_ENABLE_KRAFT: 'yes'
      KAFKA_KRAFT_CLUSTER_ID: 'xxxxxxxxxxxxxxxx'
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_LISTENERS: CONTROLLER://:9093,INSIDE://:9092,EXTERNAL://:9094
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INSIDE:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@dpkafka01:9093,2@dpkafka02:9093,3@dpkafka03:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: INSIDE://:9092,EXTERNAL://_{HOSTIP}:9097
      KAFKA_BROKER_ID: 1
      KAFKA_CFG_NODE_ID: 1
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_HEAP_OPTS: "-Xmx1G -Xms256m"
      KAFKA_LOG_DIRS: /bitnami/kafka/kafka-logs
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_LOG_RETENTION_MS: 7200000
      KAFKA_LOG_SEGMENT_MS: 86400000
      KAFKA_LOG_DELETE_RETENTION_MS: 7200000
      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
      KAFKA_LOG_CLEANUP_POLICY: "compact,delete"
      KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 12000
      KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR: 4
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
      KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2
      ALLOW_PLAINTEXT_LISTENER: 'yes'

We appreciate some help in debugging this.

(Note: Network and related areas are verified. The errors start appearing after a few minutes, after processing data for some time, and kafkastreams switch to suspended from Running. We also noticed the same timeout error from a client that connects to the Kafka cluster. Screenshot below. This happens after reading the data from the topic for a few minutes. I'm reiterating the fact that this setup was working with no issues for the last 18 months. The timeout and related issues started just a week back) enter image description here

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
donnie
  • 2,981
  • 2
  • 16
  • 24
  • I assume you have some port mapping `9097:9094`? – OneCricketeer Aug 08 '23 at 21:03
  • Yes, As I mentioned above, a timeout happens only after a few minutes. Network-related issues are ruled out. – donnie Aug 09 '23 at 01:20
  • I suggest collecting your broker logs and looking for issues there, rather than just the client... Also, the bitnami Kafka container just had a refactoring within the last month, so if you pulled a brand new image, that would align with when you are experiencing issues – OneCricketeer Aug 09 '23 at 13:51
  • 1
    It was a stuck replication with one of the Kafka nodes. We couldn't identify the reason, as the logs could have been more helpful. We couldn't force a sync, either (using `replica.lag.time.max.ms`). Finally, we deleted and recreated that node, after which the replication resumed, and all errors related to timeouts disappeared. – donnie Aug 15 '23 at 15:00

0 Answers0