I tried to produce some json data to multinode kafka broker using confluent-kafka-python
When I execute the python script, an error message occur that said
> %3|1680100333.779|FAIL|rdkafka#producer-1| [thrd:100.25.177.77:9095/bootstrap]: 100.25.177.77:9095/bootstrap: Connect to ipv4#100.25.177.77:9095 failed: Connection refused (after 190ms in state CONNECT)
> %3|1680100333.973|FAIL|rdkafka#producer-1| [thrd:54.152.58.40:9094/bootstrap]: 54.152.58.40:9094/bootstrap: Connect to ipv4#54.152.58.40:9094 failed: Connection refused (after 194ms in state CONNECT)
and the data were produced to only one partition
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic : {msg.topic()} [partition : {msg.partition()}]')
it only returns
Message delivered to topic : json_test [partition : 0],
So I check log.dir each broker topic was created only one node
creating topic code is
new_topics = [NewTopic('ec2_json_test', num_partitions=3, replication_factor=1)]
I executed script many times and the node that topic is created(actually connected with client) is selected randomly, not only one broker
Environment info
ec2 : t2.mircro * 3
kafka : 2.13-3.4.0
docker swarm command :
docker service create
--name kafka1
--mount type=volume,source=k1-logs,destination=/tmp/kafka-logs
-p 9093:9093
--network kafka-net
--mode global
--constraint node.labels.kafka==1
--env KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
humanlearning05/kafka
/kafka/bin/kafka-server-start.sh /kafka/config/server.properties
--override listener.security.protocol.map=INT:PLAINTEXT,EXT:PLAINTEXT
--override listeners=INT://:9092,EXT://0.0.0.0:9093
--override inter.broker.listener.name=INT
--override zookeeper.connect=zookeeper:2181
--override broker.id=1
--override advertised.listeners=INT://:9092,EXT://PUBLIC-IP:9093
different option with other broker is [broker.id, port, constraint, volume source, listeners EXT port, advertised.listeners EXT port]
zookeeper container is only on manage node and command :
docker service create
--name zookeeper
--mount type=volume,source=zoo-data,destination=/tmp/zookeeper
--publish 2181:2181
--network kafka-net
--mode global
--constraint node.labels.kafka==1
humanlearning05/kafka
/kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties
kafka-net : docker network create --driver overlay kafka-net
kafka image :
From ubuntu:16.04
RUN apt-get update && apt-get install openjdk-8-jre -y
ENV kafka_version=2.13-3.4.0
ADD ./kafka_${kafka_version}.tgz ./
RUN mv kafka_${kafka_version} kafka
+ I use kafkacat about manager node:9093
Metadata for all topics (from broker -1: 54.157.151.115:9093/bootstrap):
1 brokers:
broker 1 at 54.175.81.21:9093 (controller)
4 topics:
topic "ec2_json_test" with 3 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
partition 2, leader 1, replicas: 1, isrs: 1
topic "ec2_json_test-55" with 3 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
partition 2, leader 1, replicas: 1, isrs: 1
topic "ec2_json_test-95" with 3 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 1, replicas: 1, isrs: 1
partition 2, leader 1, replicas: 1, isrs: 1
topic "json_test" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
json_test's num partition = 3 It was created when I run kafka manually, respectively in each node
other node:port(9094, 9095) return connection refused