0

I am creating a docker-compose stack for running some spring-batch sample code. The docker-compose.yml:

version: "3.9"

services:
  db:
    build:
      context: ./db/
      dockerfile: Dockerfile
    container_name: db-server
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=root
    healthcheck:
      test: [ "CMD-SHELL", "pg_isready" ]
      interval: 10s
      timeout: 30s
      retries: 5
    networks:
      - sb-pg-net
    volumes:
      - db-data:/var/lib/postgresql/data
  zookeeper:
    image: confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-7.2.1}
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zookeeper:2888:3888
    volumes:
      - zk-data:/var/lib/zookeeper/data
      - zk-logs:/var/lib/zookeeper/log
    networks:
      - sb-pg-net
  kafka:
    image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-7.2.1}
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9999:9999"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
    depends_on:
      - zookeeper
    healthcheck:
      test: nc -z localhost 9092 || exit -1
      start_period: 15s
      interval: 5s
      timeout: 10s
      retries: 10
    networks:
      - sb-pg-net
    volumes:
      - kafka-data:/var/lib/kafka/data
  kafka-manager:
    image: hlebalbau/kafka-manager:stable
    hostname: kafka-manager
    container_name: kafka-manager
    depends_on:
      - kafka
      - zookeeper
    command: -Dconfig.file=/kafka-manager/conf/application.conf -Dapplication.home=/kafkamanager
    environment:
      ZK_HOSTS: zookeeper:2181
      APPLICATION_SECRET: randomSecret
    networks:
      - sb-pg-net
    ports:
      - "9000:9000"
  manager-cluster-init:
    image: ellerbrock/alpine-bash-curl-ssl
    restart: "no"
    container_name: init
    environment:
      KAFKA_MANAGER_HOST: kafka-manager:9000
      KAFKA_CLUSTER_NAME: ${KAFKA_CLUSTER_NAME:-testing}
      KAFKA_VERSION: ${KAFKA_VERSION:-2.4.0}
      ZK_HOSTS: zookeeper:2181
    depends_on:
      - kafka-manager
      - kafka
      - zookeeper
    networks:
      - sb-pg-net
    command: /bin/bash /kafka-cluster-init.sh
    volumes:
      - "./kafka/kafka-cluster-init.sh:/kafka-cluster-init.sh"
  app:
    build:
      context: ./
      dockerfile: Dockerfile
    container_name: spring-batch-examples
    ports:
      - "8080:8080"
    environment:
      - SPRING_PROFILES_ACTIVE=docker
      - DB_URL=jdbc:postgresql://db-server:5432/springbatchdb
      - DB_USERNAME=postgres
      - DB_PASSWORD=root
      - API_PORT=8080
      - KAFKA_BROKER=kafka:9092
    networks:
      - sb-pg-net
    depends_on:
      db:
        condition: service_healthy
      kafka:
        condition: service_healthy
networks:
  sb-pg-net:
    driver: bridge

volumes:
  db-data:
  zk-data:
  zk-logs:
  kafka-data:

The db-server is a postgres:12-alpine image with some schemas, tables and data preloaded. The spring-batch app is built via the Dockerfile:

FROM eclipse-temurin:17-jdk-jammy as builder

LABEL version="0.1"

WORKDIR app

# Copy maven artifacts
COPY mvnw .
COPY .mvn .mvn
COPY pom.xml .
RUN chmod +x ./mvnw
# download the dependency if needed or if the pom file is changed
RUN ./mvnw dependency:go-offline -B

# Copy sources
COPY src src

# Build the package and copy to target path
RUN ./mvnw clean package verify -DskipTests

FROM eclipse-temurin:17-jre-jammy
ARG DEPENDENCY=/app/target
ENV LANG C.UTF-8

# Copy the dependency application file from build stage artifact
COPY --from=builder ${DEPENDENCY}/*.jar .

ENTRYPOINT [ "java", "-jar", "spring-batch-examples.jar" ]

The job is question reads from a csv file and write to a Kafka topic. There is an ItemWriter which uses the TopicWriter service:

public class EmployeeKafkaWriter implements ItemWriter<Employee> {
    @Autowired
    private TopicWriter topicWriter;

    @Override
    public void write(Chunk<? extends Employee> chunk) throws Exception {
        chunk.getItems().forEach(employee -> {
            topicWriter.write(employee);
        });
    }
}
@Service
public class TopicWriter {
    @Autowired
    private KafkaTemplate<String, Employee> kafkaTemplate;

    @Value("${app.kafka.producer.topic}")
    private String topic;

    public void write(Employee employee) {
        kafkaTemplate.send(topic, employee);
    }
}

The KafkaProducerConfig provides the kafkaTemplate bean, and the details for the broker:

@Configuration
public class KafkaProducerConfig {
    @Value("${app.kafka.producer.server}")
    private String server;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "654321");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Employee> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Employee> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

The properties are in the two files

  • application-docker.properties:
# Datasource
spring.datasource.url=${DB_URL:nodburl}
spring.datasource.username=${DB_USERNAME:nouser}
spring.datasource.password=${DB_PASSWORD:nopassword}

server.port=${API_PORT:8080}

# Kafka
app.kafka.producer.topic=employees
app.kafka.producer.server=${KAFKA_BROKER:nobroker}
spring.kafka.producer.bootstrap-servers=${KAFKA_BROKER:nobroker}
kafka.bootstrap.servers=${KAFKA_BROKER:nobroker}
  • application.properties:
# Hibernate properties
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.show-sql=false
logging.level.org.hibernate.SQL=WARN
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=WARN

spring.jpa.hibernate.ddl-auto=create

# Disable batch auto-start
spring.batch.job.enabled=false
# Database
spring.batch.initialize-schema=ALWAYS
spring.batch.jdbc.initialize-schema=ALWAYS

The jobs are triggered via a RestController which in turn uses JobLauncher.

When I run only the kafka & db services using docker-compose, and run the app locally, I am able to connect to the broker via localhost:9092. However once I introduce the app into the services, the job fails. Error log:

2023-06-02T23:45:09.189Z  INFO 1 --- [           main] i.g.s.s.SpringBatchExamplesApplication   : Started SpringBatchExamplesApplica
tion in 2.438 seconds (process running for 2.675)
2023-06-02T23:48:32.573Z  INFO 1 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServ
let 'dispatcherServlet'
2023-06-02T23:48:32.573Z  INFO 1 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherSe
rvlet'
2023-06-02T23:48:32.574Z  INFO 1 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2023-06-02T23:48:32.630Z  INFO 1 --- [cTaskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=csv2Topic]]
 launched with the following parameters: [{'date':'{value=Fri Jun 02 23:48:32 UTC 2023, type=class java.util.Date, identifying=true}
','fileName':'{value=employees.csv, type=class java.lang.String, identifying=true}'}]
2023-06-02T23:48:32.648Z  INFO 1 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [csv2TopicStep]
2023-06-02T23:48:32.696Z  INFO 1 --- [cTaskExecutor-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
        acks = -1
        batch.size = 16384
        bootstrap.servers = [kafka:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.adaptive.partitioning.enable = true
        partitioner.availability.timeout.ms = 0
        partitioner.class = null
        partitioner.ignore.keys = false
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2023-06-02T23:48:32.733Z  INFO 1 --- [cTaskExecutor-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-06-02T23:48:32.764Z  WARN 1 --- [cTaskExecutor-1] o.a.k.clients.producer.ProducerConfig    : These configurations '[max.poll.interval.ms]' were supplied but are not used yet.
2023-06-02T23:48:32.765Z  INFO 1 --- [cTaskExecutor-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-06-02T23:48:32.765Z  INFO 1 --- [cTaskExecutor-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-06-02T23:48:32.765Z  INFO 1 --- [cTaskExecutor-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1685749712764
2023-06-02T23:48:32.960Z  WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {employees=LEADER_NOT_AVAILABLE}
2023-06-02T23:48:32.961Z  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 64hzoa1jT-yvxgOdlI4Jjg
2023-06-02T23:48:32.976Z  INFO 1 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2023-06-02T23:48:33.057Z  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.058Z  WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
2023-06-02T23:48:33.158Z  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.158Z  WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
2023-06-02T23:48:33.259Z  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.259Z  WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.

I checked if the service is available from the app container.

root@695f8731cd2c:/# nc -vz kafka 9092
Connection to kafka (172.28.0.4) 9092 port [tcp/*] succeeded!

What is weird are these two lines in the log:

2023-06-02T23:48:33.057Z  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.058Z  WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.

Why do they keep mentioning 127.0.0.1:9092 even though the provided broker url is kafka:9092.

What am I doing wrong here?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Soumik Das
  • 276
  • 2
  • 14

0 Answers0