I have a springboot application which is using kafka consumer and producer. I hosted my application image on docker.Kafka server runs fine on docker but my application is not communicating with that hosted server.Sharing all the config file. Please let me know where I am going wrong
Kafka config in springboot
@Configuration
@EnableKafka
public class TestConfig {
@Bean
public ConsumerFactory<String, UpdateStatusObject> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonSerializer.TYPE_MAPPINGS, "up:com.moviebookingapp.models.UpdateStatusObject");
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.moviebookingapp.models.UpdateStatusObject");
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<String, UpdateStatusObject>(config, new StringDeserializer(), new JsonDeserializer<>(UpdateStatusObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UpdateStatusObject> kafkaLister() {
ConcurrentKafkaListenerContainerFactory<String, UpdateStatusObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setMissingTopicsFatal(false);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ProducerFactory<String, UpdateStatusObject> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(JsonSerializer.TYPE_MAPPINGS, "up:com.moviebookingapp.models.UpdateStatusObject");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, UpdateStatusObject> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka topic config
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic ticketAvailabilityTopic() {
return new NewTopic("ticket-availability-topic", 1, (short) 1);
}
}
Docker compose file
version: '3.9'
services:
mongodb:
image: mongo:latest
container_name: moviebookingappmongodb
ports:
- "27017:27017"
zookeeper:
image: wurstmeister/zookeeper:latest
container_name: moviebookingappzookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
container_name: moviebookingappkafka
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
moviebookingappspringboot:
image: moviebookingapp:1.0
container_name: moviebookingappspringboot
restart: always
ports:
- "8080:8080"
environment:
SPRING_KAFKA_CONSUMER_BOOTSTRAP_SERVERS: kafka:9092
SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS: kafka:9092
links:
- mongodb
- zookeeper
- kafka
Error
org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group_id-1, groupId=group_id] Node -1 disconnected.
WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group_id-1, groupId=group_id] Connection to node -1 (kafka/172.27.0.4:9092) could not be established. Broker may not be available.
WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group_id-1, groupId=group_id] Bootstrap broker kafka:9092 (id: -1 rack: null) disconnected