0

I have created a simple app to learn how to use Kafka, and I have used Confluent Kafka. When I build the Docker Compose, everything seems fine, but when I check the Docker logs, I notice that the producer doesn't seem to write, and it never enters the delivery_callback to notify me of successful writing.

Does anyone have any idea why it's not working?

Below are the portions of the code: Docker Compose:

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeperTest
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:latest
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: "true"
      CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    container_name: schema-registryTest
    hostname: http://broker-schema-registry
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker:29092
      SCHEMA_REGISTRY_HOST_NAME: http://broker-schema-registry:8081
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    restart: always

  producer:
    build: ./producer
    depends_on:
      - broker

  consumer:
    build: ./consumer
    depends_on:
      - broker

Producer:

from confluent_kafka import Producer
import logging

producer = Producer({"bootstrap.servers": "broker:9092",
                     'request.timeout.ms': 100000, 'delivery.timeout.ms': 100000})
logging.basicConfig(level=logging.INFO)


def delivery_callback(err, msg):
    if err is not None:

        logging.info(f"Failed to deliver message: {err}")
    else:
        logging.info(f"Message delivered to {msg.topic()} [{msg.partition()}]")


for i in range(10):
    logging.info("Produce message")

    producer.produce(
        "test-topic", f"Message {i}".encode("utf-8"), callback=delivery_callback)
    logging.info("Before poll Produce message")
    producer.poll(0)
    logging.info("After poll Produce message")

producer.flush()

Consumer:

from confluent_kafka import Consumer, KafkaError

consumer = Consumer({"bootstrap.servers": "broker:9092",
                    "group.id": "my-consumer-group"})

consumer.subscribe(["test-topic"])

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print("Reached end of partition")
        else:
            print(f"Error while consuming message: {msg.error()}")
    else:
        print(f"Received message: {msg.value().decode('utf-8')}")

consumer.close()

I tried to create a bridge connecting ZooKeeper and the broker, but I didn't see any changes. I also attempted to use Apache UI, but it seems that it doesn't detect anything either.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Your schema registry correctly uses port 29092 to connect to kafka, so your python apps should use the same, but more importantly, `depends_on` will not wait for broker container to start, so your Python apps need to wait, or loop and retry – OneCricketeer Jul 25 '23 at 13:01

0 Answers0