Small question regarding connecting a Spring Kafka application to a Dockerized Kafka please.
The code of the Dockerfile is:
version: "3.9"
services:
kafka:
networks: [ "mynetwork" ]
container_name: kafka.mynetwork
hostname: kafka0
image: hexhex/kafka-kraft:0.0.10
environment:
ADVERTISED_LISTENERS: kafka0:9092
ports:
- 9092:9092
kafka-ui:
networks: [ "mynetwork" ]
container_name: kafka-ui.mynetwork
hostname: kafka-ui.mynetwork
image: provectuslabs/kafka-ui
ports:
- 8080:8080
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka0:9092
depends_on:
- kafka
networks:
mynetwork:
name: mynetwork
I strongly believe the docker compose file, with its two services, are correct.
I am able to connect to http://localhost:8080 and see the Kafka-UI. I can create topics, etc.
I am able to use the consumer shell inside the kafka container to send and consume message.
Now, I am having a Spring Kafka app, very straightforward, following an online tutorial:
https://www.baeldung.com/spring-kafka
with the code:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
}
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
final Map<String, Object> configProps = Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
}
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "greetingTopicName", groupId = "groupId")
public void greetingListener(Greeting greeting) {
System.out.println("greeting " + greeting);
}
}
@Service
public class KafKaProducerService {
private static final Logger logger = LoggerFactory.getLogger(KafKaProducerService.class);
@Autowired
private KafkaTemplate<String, Greeting> kafkaTemplate;
public void sendGreetingMessage(Greeting greeting) {
logger.info(String.format("Message sending -> %s", greeting));
kafkaTemplate.send("greetingTopicName", greeting);
logger.info(String.format("Message sent -> %s", greeting));
}
}
For the bootstrap server, I tried localhost:9092, http://localhost:9092, 0.0.0.0:9092, 127.0.0.1:9092, kafka0:9092 and unfortunately, all yields:
WARN 21172 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-groupId-1, groupId=groupId] Error connecting to node kafka0:9092 (id: 2147483646 rack: null)
java.net.UnknownHostException: No such host is known (kafka0)
I am on windows using the latest docker.
Here is the complete trace:
INFO 21172 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
WARN 21172 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-groupId-1, groupId=groupId] Error connecting to node kafka0:9092 (id: 2147483646 rack: null)
java.net.UnknownHostException: No such host is known (kafka0)
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[na:na]
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:933) ~[na:na]
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1529) ~[na:na]
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:852) ~[na:na]
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1377) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1305) ~[na:na]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:590) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:901) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:877) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:265) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnreadySync(ConsumerCoordinator.java:492) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:524) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) ~[kafka-clients-3.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1665) ~[spring-kafka-3.0.0.jar:3.0.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1640) ~[spring-kafka-3.0.0.jar:3.0.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1441) ~[spring-kafka-3.0.0.jar:3.0.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-3.0.0.jar:3.0.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
INFO 21172 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-groupId-1, groupId=groupId] (Re-)joining group
WARN 21172 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-groupId-1, groupId=groupId] Error connecting to node kafka0:9092 (id: 2147483646 rack: null)
java.net.UnknownHostException: kafka0
To avoi confusions: It is a Spring Kafka app (not inside docker) running from laptop, trying to connect to kafka inside docker.
It is not a dockerized spring app trying to connect to a dockerized kafka, it is not a metal spring kafka app trying to connect to a metal kafka.
Thank you