This is a follow up question on my previous post. I am fairly new on both Kafka and Docker and I appreciate any detailed explanation.
I successfully setup my Kafka cluster using docker with zookeeper and kafdrop. Now, I want to connect my Spring project to connect to this cluster using Spring Kafka.
At first it successfully connected and I saw topics were automatically created. But then, I encountered this error that kept on running:
11:27:27.727 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=*****] Group coordinator 66aa896dd4c0:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
11:27:27.730 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=*****] Discovered group coordinator 66aa896dd4c0:9092 (id: 2147482646 rack: null)
11:27:27.730 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=c*****] Discovered group coordinator 6043a3559f58:9092 (id: 2147482644 rack: null)
11:27:27.730 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=*****] Discovered group coordinator 6043a3559f58:9092 (id: 2147482644 rack: null)
11:27:27.730 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=*****] Group coordinator 66aa896dd4c0:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
11:27:27.730 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=*****] Group coordinator 6043a3559f58:9092 (id: 2147482644 rack: null) is unavailable or invalid, will attempt rediscovery
11:27:27.730 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=*****] Group coordinator 6043a3559f58:9092 (id: 2147482644 rack: null) is unavailable or invalid, will attempt rediscovery
11:27:27.732 INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=*****, groupId=*****] Discovered group coordinator 66aa896dd4c0:9092 (id: 2147482646 rack: null)
Based on this log, the group coordinator keeps being discovered and lost.
This is my Consumer Config
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
props.put(ConsumerConfig.GROUP_ID_CONFIG, 1);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "1500");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
Producer Config
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}