This is a simple Springboot application with a trivial rest controller with Dockerfile and docker-compose files. When I run zookeeper & kafka manually from cmd (bin):
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
then Springboot app works fine. Even if I run zookeeper & kafka from dockerfile and connect Springboot app to kafka. It works as well.
Now my goal is to create one docker-compose.yml with zookeeper, kafka, springboot app all together. docker-compose.yml file is ready but once I run
docker-compose up
I get:
org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
Crucial classes:
@RestController
@RequestMapping("/api/employee")
@RequiredArgsConstructor
public class EmployeeController {
private final MessageProducer messageProducer;
@GetMapping
@ResponseStatus(HttpStatus.OK)
public String getAllEmployees() {
messageProducer.sendMessage("Send message to kafka message broker");
return "This is REST API test";
}
}
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
}
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Component
@Slf4j
public class MessageListener {
private String payload;
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
payload = message;
log.info("Received Message in group 'foo': {}", message);
}
public String getPayload() {
return payload;
}
}
@Component
@Slf4j
public class MessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value(value = "${message.topic.name}")
private String topicName;
public void sendMessage(String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata());
} else {
log.error("Unable to send message=[{}] due to : {}", message, ex.getMessage());
}
});
}
}
// Dockerfile
FROM openjdk:17-alpine
ADD target/demo-0.0.1-SNAPSHOT.jar app.jar
ENTRYPOINT ["java","-jar","app.jar"]
// docker-compose.yml
version: '3.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper
container_name: zookeeper
restart: unless-stopped
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
healthcheck:
test: "echo stat | nc localhost $ZOOKEEPER_CLIENT_PORT"
start_period: 20s
kafka:
container_name: kafka
image: confluentinc/cp-kafka
restart: unless-stopped
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ADVERTISED_HOST_NAME:
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "9092"]
start_period: 20s
app:
container_name: springbootdemotest
image: springbootdemotest
build: ./
ports:
- "8080:8080"
depends_on:
- zookeeper
- kafka
I thought about adding ports forwarding to docker-compose but it didn't help. ports: - 9092:9092