0

This is a simple Springboot application with a trivial rest controller with Dockerfile and docker-compose files. When I run zookeeper & kafka manually from cmd (bin):

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

then Springboot app works fine. Even if I run zookeeper & kafka from dockerfile and connect Springboot app to kafka. It works as well.

Now my goal is to create one docker-compose.yml with zookeeper, kafka, springboot app all together. docker-compose.yml file is ready but once I run

docker-compose up

I get:

org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

Crucial classes:

@RestController
@RequestMapping("/api/employee")
@RequiredArgsConstructor
public class EmployeeController {
    private final MessageProducer messageProducer;

    @GetMapping
    @ResponseStatus(HttpStatus.OK)
    public String getAllEmployees() {
        messageProducer.sendMessage("Send message to kafka message broker");
        return "This is REST API test";
    }
}



@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("foo");
    }
}


@Configuration
public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}



@Component
@Slf4j
public class MessageListener {

    private String payload;

    @KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
    public void listenGroupFoo(String message) {
        payload = message;
        log.info("Received Message in group 'foo': {}", message);
    }

    public String getPayload() {
        return payload;
    }
}



@Component
@Slf4j
public class MessageProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value(value = "${message.topic.name}")
    private String topicName;

    public void sendMessage(String message) {

        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
        future.whenComplete((result, ex) -> {

            if (ex == null) {
                log.info("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata());
            } else {
                log.error("Unable to send message=[{}] due to : {}", message, ex.getMessage());
            }
        });
    }
}

// Dockerfile

FROM openjdk:17-alpine
ADD target/demo-0.0.1-SNAPSHOT.jar app.jar
ENTRYPOINT ["java","-jar","app.jar"]

// docker-compose.yml

  version: '3.1'
  services:
    zookeeper:
      image: confluentinc/cp-zookeeper
      container_name: zookeeper
      restart: unless-stopped
      ports:
        - 2181:2181
      environment:
        ZOOKEEPER_CLIENT_PORT: 2181
      healthcheck:
        test: "echo stat | nc localhost $ZOOKEEPER_CLIENT_PORT"
        start_period: 20s

    kafka:
      container_name: kafka
      image: confluentinc/cp-kafka
      restart: unless-stopped
      depends_on:
        - zookeeper
      ports:
        - 9092:9092
      environment:
        KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
        KAFKA_DELETE_TOPIC_ENABLE: "true"
        KAFKA_ADVERTISED_HOST_NAME:
      healthcheck:
        test: ["CMD", "nc", "-z", "localhost", "9092"]
        start_period: 20s

    app:
      container_name: springbootdemotest
      image: springbootdemotest
      build: ./
      ports:
        - "8080:8080"
      depends_on:
        - zookeeper
        - kafka

I thought about adding ports forwarding to docker-compose but it didn't help. ports: - 9092:9092

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
andrew
  • 3,083
  • 4
  • 24
  • 29
  • 1) depends_on won't wait for the broker container to actually start 2) You should ideally add environment variable to your Spring container to point to the location of the broker container, which is `kafka:29092`, not localhost:9092... Port forwarding isn't needed between containers in Compose 3) Please try using Spring boot docker plugin (maven/gradle) rather than writing your own Dockerfile – OneCricketeer Apr 17 '23 at 13:45

0 Answers0