1

I tried to produce some json data to multinode kafka broker using confluent-kafka-python

When I execute the python script, an error message occur that said

> %3|1680100333.779|FAIL|rdkafka#producer-1| [thrd:100.25.177.77:9095/bootstrap]: 100.25.177.77:9095/bootstrap: Connect to ipv4#100.25.177.77:9095 failed: Connection refused (after 190ms in state CONNECT)

> %3|1680100333.973|FAIL|rdkafka#producer-1| [thrd:54.152.58.40:9094/bootstrap]: 54.152.58.40:9094/bootstrap: Connect to ipv4#54.152.58.40:9094 failed: Connection refused (after 194ms in state CONNECT)

and the data were produced to only one partition

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to topic : {msg.topic()} [partition : {msg.partition()}]')

it only returns

Message delivered to topic : json_test [partition : 0],

So I check log.dir each broker topic was created only one node
creating topic code is

new_topics = [NewTopic('ec2_json_test', num_partitions=3, replication_factor=1)]

I executed script many times and the node that topic is created(actually connected with client) is selected randomly, not only one broker

Environment info

ec2 : t2.mircro * 3
kafka : 2.13-3.4.0
docker swarm command :

docker service create 
--name kafka1 
--mount type=volume,source=k1-logs,destination=/tmp/kafka-logs 
-p 9093:9093 
--network kafka-net 
--mode global 
--constraint node.labels.kafka==1 
--env KAFKA_HEAP_OPTS="-Xmx400m -Xms400m" 
humanlearning05/kafka 
/kafka/bin/kafka-server-start.sh /kafka/config/server.properties 
--override listener.security.protocol.map=INT:PLAINTEXT,EXT:PLAINTEXT 
--override listeners=INT://:9092,EXT://0.0.0.0:9093 
--override inter.broker.listener.name=INT 
--override zookeeper.connect=zookeeper:2181 
--override broker.id=1 
--override advertised.listeners=INT://:9092,EXT://PUBLIC-IP:9093

different option with other broker is [broker.id, port, constraint, volume source, listeners EXT port, advertised.listeners EXT port]

zookeeper container is only on manage node and command :

docker service create 
--name zookeeper 
--mount type=volume,source=zoo-data,destination=/tmp/zookeeper 
--publish 2181:2181 
--network kafka-net 
--mode global 
--constraint node.labels.kafka==1 
humanlearning05/kafka 
/kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties

kafka-net : docker network create --driver overlay kafka-net

kafka image :

From ubuntu:16.04

RUN apt-get update && apt-get install openjdk-8-jre -y


ENV kafka_version=2.13-3.4.0
ADD ./kafka_${kafka_version}.tgz ./
RUN mv kafka_${kafka_version} kafka

+ I use kafkacat about manager node:9093

Metadata for all topics (from broker -1: 54.157.151.115:9093/bootstrap):
 1 brokers:
  broker 1 at 54.175.81.21:9093 (controller)
 4 topics:
  topic "ec2_json_test" with 3 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 1, replicas: 1, isrs: 1
  topic "ec2_json_test-55" with 3 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 1, replicas: 1, isrs: 1
  topic "ec2_json_test-95" with 3 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 1, replicas: 1, isrs: 1
  topic "json_test" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

json_test's num partition = 3 It was created when I run kafka manually, respectively in each node

other node:port(9094, 9095) return connection refused

humanlearning
  • 63
  • 1
  • 6
  • This looks correct, at a glance, but please install kcat and show output from `kcat -L` command, then use `nc -vz broker-ip port` to test connectivity without using Kafka client libraries – OneCricketeer Mar 29 '23 at 14:39
  • Also, please don't create your own Kafka images (especially using an almost decade old version of Ubuntu as a base) .... Refer https://stackoverflow.com/a/51634499/2308683 – OneCricketeer Mar 29 '23 at 14:41
  • when I accidentally check `docker service ls`, I found that two worker node that doesn't have zookeeper container repeatedly restart. The error message is `kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING` Would it be the cause? But as far as I know ingress, docker swarm network system, open other nodes' port if service is created in a node – humanlearning Mar 30 '23 at 06:17
  • nc -vz ip port return connected : manager node(zookeeper container exists) : 2181, 9093 refused : manager node(zookeeper container exists) : 9093, 9094, 9095 worker nodes(zookeeper container not exists) : 9093, 9094, 9095 @OneCricketeer – humanlearning Mar 30 '23 at 08:36
  • Yes, Zookeeper should not be restarting. As mentioned, don't create your own images, and that might not happen. Unclear what ports 9094 and 9095 are used for here... As mentioned, [edit] the post with `kcat -L` output, assuming you can connect to the broker... Also, I don't use Swarm, but [read this](https://github.com/wurstmeister/kafka-docker/wiki/Connectivity) as one possible soluiton. I recommend people use https://strimzi.io , which you should be able to use on k3s for example, for a simpler deployment (or use EKS) – OneCricketeer Mar 30 '23 at 17:13

0 Answers0