I have created a simple app to learn how to use Kafka, and I have used Confluent Kafka. When I build the Docker Compose, everything seems fine, but when I check the Docker logs, I notice that the producer doesn't seem to write, and it never enters the delivery_callback to notify me of successful writing.
Does anyone have any idea why it's not working?
Below are the portions of the code: Docker Compose:
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeperTest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registryTest
hostname: http://broker-schema-registry
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker:29092
SCHEMA_REGISTRY_HOST_NAME: http://broker-schema-registry:8081
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
restart: always
producer:
build: ./producer
depends_on:
- broker
consumer:
build: ./consumer
depends_on:
- broker
Producer:
from confluent_kafka import Producer
import logging
producer = Producer({"bootstrap.servers": "broker:9092",
'request.timeout.ms': 100000, 'delivery.timeout.ms': 100000})
logging.basicConfig(level=logging.INFO)
def delivery_callback(err, msg):
if err is not None:
logging.info(f"Failed to deliver message: {err}")
else:
logging.info(f"Message delivered to {msg.topic()} [{msg.partition()}]")
for i in range(10):
logging.info("Produce message")
producer.produce(
"test-topic", f"Message {i}".encode("utf-8"), callback=delivery_callback)
logging.info("Before poll Produce message")
producer.poll(0)
logging.info("After poll Produce message")
producer.flush()
Consumer:
from confluent_kafka import Consumer, KafkaError
consumer = Consumer({"bootstrap.servers": "broker:9092",
"group.id": "my-consumer-group"})
consumer.subscribe(["test-topic"])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition")
else:
print(f"Error while consuming message: {msg.error()}")
else:
print(f"Received message: {msg.value().decode('utf-8')}")
consumer.close()
I tried to create a bridge connecting ZooKeeper and the broker, but I didn't see any changes. I also attempted to use Apache UI, but it seems that it doesn't detect anything either.