18

I am trying to read messages on Kafka topic, but I am unable to read it. The process gets killed after sometime, without reading any messages.

Here is the rebalancing error which I get:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

I tried to run ConsumerOffsetChecker, and this is the error which I get. I have no clue whatsoever, how to resolve this. Anybody, any idea?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 16 more
divinedragon
  • 5,105
  • 13
  • 50
  • 97
  • Given the message in the exception it seems to me that the Kafka consumer was able to connect to ZK, but it didn't find the /consumers path in there, which sounds like the ZK database was corrupted – Bogdan Mar 31 '14 at 10:15
  • I sometimes experience this error as well. However connecting via the console-consumer script as user2720864 outlines below works fine. – Mark Butler Jun 04 '14 at 16:16
  • This thread on the Kafka-users list talks about this issue http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAB0WE8ZbH2=7oW4kNcs5FDxL3iHLiiJairJvewG7C5h+1Kd21A@mail.gmail.com%3E – Mark Butler Jun 04 '14 at 16:38
  • is the issue resolved? if yes, how? – jack AKA karthik Oct 26 '17 at 06:51

5 Answers5

15

I have got similar problems recently. You can try to increase the consumer configurations rebalance.backoff.ms and zookeeper.session.timeout.ms to about 5-10 seconds.

The first parameter tell kafka to wait more before retrying rebalance. The second one tell kafka to be more patient while trying to connect to zookeeper.

Other configuration options can be found on the official documentation.

Minh-Triet LÊ
  • 1,374
  • 9
  • 17
  • Sorry, I misunderstood. First I tried changing these options at the server which did nothing. Then I changed them at the consumer. This allowed me to connect, but now I get a java.nio.channels.ClosedByInterruptException. On the list http://grokbase.com/t/kafka/users/141ej00nf9/kafka-consumer-connection-issue they point to the Wiki that says GC usage might be the culprit. https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog? I don't believe that in my case because the application is just starting: – Mark Butler Jun 04 '14 at 17:29
  • 1
    @MarkButler I have seen those errors on startup when there are other running consumers. Have you verified in ZK that the rebalancing is OK after all ? It is in the zk path /consumers/[group_name]/owners/[topic name] – Minh-Triet LÊ Jun 05 '14 at 16:08
  • I tried the above mentioned steps but nothing has changed, i'm still seeing the same error . – jack AKA karthik Apr 24 '17 at 06:22
2

This probably means that the brokers did not create those nodes correctly when it connected to Zookeeper. The /consumer path should exist when you try to consume.

Here are a few paths for debugging:

Did you create any topics?

If so:

  1. How many partitions are there in the topic?
  2. Did you check that the topic metadata were correctly populated in the zookeeper?
  3. Can we see your consumer configuration?

If not:

  1. Then you need to create a topic using the script $KAFKA_DIR/bin/kafka-create-topic.sh. Look inside the script for usage details.
  2. Once you make a topic, you need to create a consumer with a group ID that has not been used before, otherwise you will not start fresh.
laughing_man
  • 3,756
  • 1
  • 20
  • 20
1

There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.

For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.

If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.

Nipun Talukdar
  • 4,975
  • 6
  • 30
  • 42
0

probably your brokers are offline and they are not able to connect to Zookeeper, have you tried running the console-consumer script available in the $KAFKA_ROOT_DIR/bin path for checking if you are able to consume from a specific topic.

user2720864
  • 8,015
  • 5
  • 48
  • 60
0

Another Issue might be because of jar conflicts. If you have the same jar with different versions stored in library folder. This Issue may arise. jars like scala-library ,zkclient, zookeeper, kafka-client should not be duplicated with different versions.

charan teja
  • 491
  • 5
  • 5