I am creating a docker-compose stack for running some spring-batch sample code. The docker-compose.yml
:
version: "3.9"
services:
db:
build:
context: ./db/
dockerfile: Dockerfile
container_name: db-server
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=root
healthcheck:
test: [ "CMD-SHELL", "pg_isready" ]
interval: 10s
timeout: 30s
retries: 5
networks:
- sb-pg-net
volumes:
- db-data:/var/lib/postgresql/data
zookeeper:
image: confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-7.2.1}
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zookeeper:2888:3888
volumes:
- zk-data:/var/lib/zookeeper/data
- zk-logs:/var/lib/zookeeper/log
networks:
- sb-pg-net
kafka:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-7.2.1}
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9999:9999"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
depends_on:
- zookeeper
healthcheck:
test: nc -z localhost 9092 || exit -1
start_period: 15s
interval: 5s
timeout: 10s
retries: 10
networks:
- sb-pg-net
volumes:
- kafka-data:/var/lib/kafka/data
kafka-manager:
image: hlebalbau/kafka-manager:stable
hostname: kafka-manager
container_name: kafka-manager
depends_on:
- kafka
- zookeeper
command: -Dconfig.file=/kafka-manager/conf/application.conf -Dapplication.home=/kafkamanager
environment:
ZK_HOSTS: zookeeper:2181
APPLICATION_SECRET: randomSecret
networks:
- sb-pg-net
ports:
- "9000:9000"
manager-cluster-init:
image: ellerbrock/alpine-bash-curl-ssl
restart: "no"
container_name: init
environment:
KAFKA_MANAGER_HOST: kafka-manager:9000
KAFKA_CLUSTER_NAME: ${KAFKA_CLUSTER_NAME:-testing}
KAFKA_VERSION: ${KAFKA_VERSION:-2.4.0}
ZK_HOSTS: zookeeper:2181
depends_on:
- kafka-manager
- kafka
- zookeeper
networks:
- sb-pg-net
command: /bin/bash /kafka-cluster-init.sh
volumes:
- "./kafka/kafka-cluster-init.sh:/kafka-cluster-init.sh"
app:
build:
context: ./
dockerfile: Dockerfile
container_name: spring-batch-examples
ports:
- "8080:8080"
environment:
- SPRING_PROFILES_ACTIVE=docker
- DB_URL=jdbc:postgresql://db-server:5432/springbatchdb
- DB_USERNAME=postgres
- DB_PASSWORD=root
- API_PORT=8080
- KAFKA_BROKER=kafka:9092
networks:
- sb-pg-net
depends_on:
db:
condition: service_healthy
kafka:
condition: service_healthy
networks:
sb-pg-net:
driver: bridge
volumes:
db-data:
zk-data:
zk-logs:
kafka-data:
The db-server
is a postgres:12-alpine
image with some schemas, tables and data preloaded. The spring-batch app is built via the Dockerfile
:
FROM eclipse-temurin:17-jdk-jammy as builder
LABEL version="0.1"
WORKDIR app
# Copy maven artifacts
COPY mvnw .
COPY .mvn .mvn
COPY pom.xml .
RUN chmod +x ./mvnw
# download the dependency if needed or if the pom file is changed
RUN ./mvnw dependency:go-offline -B
# Copy sources
COPY src src
# Build the package and copy to target path
RUN ./mvnw clean package verify -DskipTests
FROM eclipse-temurin:17-jre-jammy
ARG DEPENDENCY=/app/target
ENV LANG C.UTF-8
# Copy the dependency application file from build stage artifact
COPY --from=builder ${DEPENDENCY}/*.jar .
ENTRYPOINT [ "java", "-jar", "spring-batch-examples.jar" ]
The job is question reads from a csv file and write to a Kafka topic. There is an ItemWriter
which uses the TopicWriter
service:
public class EmployeeKafkaWriter implements ItemWriter<Employee> {
@Autowired
private TopicWriter topicWriter;
@Override
public void write(Chunk<? extends Employee> chunk) throws Exception {
chunk.getItems().forEach(employee -> {
topicWriter.write(employee);
});
}
}
@Service
public class TopicWriter {
@Autowired
private KafkaTemplate<String, Employee> kafkaTemplate;
@Value("${app.kafka.producer.topic}")
private String topic;
public void write(Employee employee) {
kafkaTemplate.send(topic, employee);
}
}
The KafkaProducerConfig
provides the kafkaTemplate
bean, and the details for the broker:
@Configuration
public class KafkaProducerConfig {
@Value("${app.kafka.producer.server}")
private String server;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "654321");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Employee> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Employee> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
The properties are in the two files
application-docker.properties
:
# Datasource
spring.datasource.url=${DB_URL:nodburl}
spring.datasource.username=${DB_USERNAME:nouser}
spring.datasource.password=${DB_PASSWORD:nopassword}
server.port=${API_PORT:8080}
# Kafka
app.kafka.producer.topic=employees
app.kafka.producer.server=${KAFKA_BROKER:nobroker}
spring.kafka.producer.bootstrap-servers=${KAFKA_BROKER:nobroker}
kafka.bootstrap.servers=${KAFKA_BROKER:nobroker}
application.properties
:
# Hibernate properties
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.show-sql=false
logging.level.org.hibernate.SQL=WARN
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=WARN
spring.jpa.hibernate.ddl-auto=create
# Disable batch auto-start
spring.batch.job.enabled=false
# Database
spring.batch.initialize-schema=ALWAYS
spring.batch.jdbc.initialize-schema=ALWAYS
The jobs are triggered via a RestController
which in turn uses JobLauncher
.
When I run only the kafka & db services using docker-compose
, and run the app locally, I am able to connect to the broker via localhost:9092
. However once I introduce the app into the services, the job fails. Error log:
2023-06-02T23:45:09.189Z INFO 1 --- [ main] i.g.s.s.SpringBatchExamplesApplication : Started SpringBatchExamplesApplica
tion in 2.438 seconds (process running for 2.675)
2023-06-02T23:48:32.573Z INFO 1 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServ
let 'dispatcherServlet'
2023-06-02T23:48:32.573Z INFO 1 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherSe
rvlet'
2023-06-02T23:48:32.574Z INFO 1 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2023-06-02T23:48:32.630Z INFO 1 --- [cTaskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=csv2Topic]]
launched with the following parameters: [{'date':'{value=Fri Jun 02 23:48:32 UTC 2023, type=class java.util.Date, identifying=true}
','fileName':'{value=employees.csv, type=class java.lang.String, identifying=true}'}]
2023-06-02T23:48:32.648Z INFO 1 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [csv2TopicStep]
2023-06-02T23:48:32.696Z INFO 1 --- [cTaskExecutor-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [kafka:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
2023-06-02T23:48:32.733Z INFO 1 --- [cTaskExecutor-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-06-02T23:48:32.764Z WARN 1 --- [cTaskExecutor-1] o.a.k.clients.producer.ProducerConfig : These configurations '[max.poll.interval.ms]' were supplied but are not used yet.
2023-06-02T23:48:32.765Z INFO 1 --- [cTaskExecutor-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.3.2
2023-06-02T23:48:32.765Z INFO 1 --- [cTaskExecutor-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: b66af662e61082cb
2023-06-02T23:48:32.765Z INFO 1 --- [cTaskExecutor-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1685749712764
2023-06-02T23:48:32.960Z WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {employees=LEADER_NOT_AVAILABLE}
2023-06-02T23:48:32.961Z INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: 64hzoa1jT-yvxgOdlI4Jjg
2023-06-02T23:48:32.976Z INFO 1 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2023-06-02T23:48:33.057Z INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.058Z WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
2023-06-02T23:48:33.158Z INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.158Z WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
2023-06-02T23:48:33.259Z INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.259Z WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
I checked if the service is available from the app container.
root@695f8731cd2c:/# nc -vz kafka 9092
Connection to kafka (172.28.0.4) 9092 port [tcp/*] succeeded!
What is weird are these two lines in the log:
2023-06-02T23:48:33.057Z INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node 1 disconnected.
2023-06-02T23:48:33.058Z WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
Why do they keep mentioning 127.0.0.1:9092
even though the provided broker url is kafka:9092
.
What am I doing wrong here?