16

I have been trying this for two weeks now, I am running Kafka cluster on separate machines than my connect nodes. I am unable to get connect running properly. I can read and write to kafka no issue. Zookeeper seems to be running fine.

I launch connect:

$ bin/connect-distributed connect-distributed.properties

Connect keeps looping through this error:

[2018-08-21 15:45:12,161] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,163] INFO [Worker clientId=c1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-08-21 15:45:12,165] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
[2018-08-21 15:45:12,266] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,267] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)

Here is what my connect-distributed.properties looks like:

bootstrap.servers=172.25.1.2:9092,172.25.1.3:9092,172.25.1.4:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=172.25.1.5
rest.port=8083

heartbeat.interval.ms=3000
session.timeout.ms=30000
security.protocol=PLAINTEXT
client.id=c1

plugin.path=/usr/share/java

__Consumer_offsets topic looks like this:

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets                                       
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
    Topic: __consumer_offsets       Partition: 0    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 1    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 3    Leader: 1       Replicas: 1     Isr: 1
    Topic: __consumer_offsets       Partition: 4    Leader: 2       Replicas: 2     Isr: 2.... etc
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
ldrrp
  • 666
  • 1
  • 7
  • 24
  • I don't think you should be setting `client.id` in the connect properties. At least it is not even mentioned in the property file https://github.com/apache/kafka/blob/trunk/config/connect-distributed.properties – OneCricketeer Aug 21 '18 at 18:28
  • "The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging." https://kafka.apache.org/documentation/ – ldrrp Aug 22 '18 at 13:09
  • I do not think its what would be causing any issues, i have tried it without before hand without any luck. – ldrrp Aug 22 '18 at 13:10
  • @ldrrp - seems it is not able to discover the kafka node... so are your connect instance and kafka instance under same VPC?? because from config it seems you are using the private IP to reach the Kafka cluster – Aman Juneja Aug 29 '18 at 12:20
  • I have confirmed connection by telnet ipaddress port on each server. everything connects fine. – ldrrp Aug 30 '18 at 18:01
  • Even number of Zookeeper nodes? (http://bytecontinnum.com/2016/09/zookeeper-always-configured-odd-number-nodes/) – earizon Aug 31 '18 at 11:43
  • You mean odd number, and yes. as per the docs for a minimum cluster size i have 3. – ldrrp Aug 31 '18 at 17:25
  • At this point my team is building an elastic search kafka connector on Golang. Im not sure i can wait any longer before my director gets mad at me. I will keep trying other things though on the side for anyone who may be facing this issue as well. – ldrrp Aug 31 '18 at 17:27

7 Answers7

3

After writing a connector in Go, I came across the same issue. I was forced to solve it myself.

When a connector connects to kafka it automatically writes to the topics and to __offset_topics. When a connector crashes, it leaves trace of itself in those tables as the coordinator. When a new connector starts up it finds the record in the table and attempts to communicate with the coordinator. The coordinator fails to respond and the connector never works.

You can fix this one of two ways, Delete all the topics (connect-configs, connect-offsets, connect-status, __offset_topics) and restart the cluster. The other method is to remove the coordinator from the topics, which I am currently unsure how to perform.

ldrrp
  • 666
  • 1
  • 7
  • 24
  • It's not really clear why `The coordinator fails to respond and the connector never works.` all of a sudden. Were you able to figure out what was the root issue? Sorry, I know it's almost 2 years now, but worth a shot :D – Sanat Serikuly Apr 06 '20 at 13:48
3

Posting for others who might find it useful.

I had the same problem... and restarting kafka solved the problem in my case.

After executing:

service kafka status

Made my log to be fine in less than 10 seconds:

2019-11-08 14:30:19.781  INFO [-,,,] 1 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=datasources-ca-contacts] Discovered group coordinator myserver:9092 (id: 2147483647 rack: null)
Federico Piazza
  • 30,085
  • 15
  • 87
  • 123
1

Deleting the broker and restarting kafka did the work for me . Delete the kakfka-logs and zookeeper folder from /tmp folder in mac .

  • Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Nov 29 '22 at 14:58
0

I got this error when I ran out of disk space on my brokers. Might be something worth checking.

Benjamin Berman
  • 521
  • 4
  • 15
0

I think the cause that mention @ldrrp is correct, but if you don't want or you can't delete the "offsets cache" you can use a new group for your consumer.

//before: original group cause error
props.put("group.id", "my_data_consumer");

//after: works for me
props.put("group.id", "my_data_consumer_2");
Adán Escobar
  • 1,729
  • 9
  • 15
0

After rebooting the brokers, This problem resolved

  • Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Jul 25 '23 at 06:28
-2

add hostname of all kafka brokers to your /etc/hosts file, and try again

Kai
  • 679
  • 9
  • 11