0

So I'm facing an odd situation where I can connect to my kafka broker but can´t produce to the topic I created. I have three images running on same network: zookeeper,kafka and my app. I have a simple producer in my app that goes like this:

import json
from kafka import KafkaProducer
from loguru import logger

producer = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    api_version=(0, 10, 21),
)


def on_send_error(excp):
    logger.error(f"I am an errback {excp}", exc_info=excp)
    # handle exception


def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)


def produce_to_kafka():
    print(producer.bootstrap_connected())
    data = json.dumps({"message": f"Produzindo para o Kafka mensagem 1"})
    producer.send("test_producer", value=data.encode()).add_callback(
        on_send_success
    ).add_errback(on_send_error)
    producer.flush()

When I try to run this code segment inside my app it prints True in the bootstrap_connected function but then gives me the following error when calling send():

ERROR | apps.my_app.producer:on_send_error:12  I am an errback KafkaTimeoutError: Batch for TopicPartition(topic='test_producer', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time

My compose setup is the following:

services:
  myapp:
    image: <my_app:latest>
    build:
      context: ..
      dockerfile: docker/Dockerfile
      args:
        DEPLOYMENT_MODE: develop
    stop_signal: SIGINT
    environment:

    ports:
    - "8000:8000"

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: kafka
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Can someone help me please?

I tried changing the hosts on docker-compose.yml to many different communication but nothing worked.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • 1) The Kafka container does not start immediately, so your Producer may fail to connect. 2) Your `INSIDE` listener is using port 9093. Therefore, that's what the other Dockerized clients need to use. 9092 forwarded to your host is for non-Docker clients. – OneCricketeer Jun 30 '23 at 19:55

0 Answers0