So I'm facing an odd situation where I can connect to my kafka broker but can´t produce to the topic I created. I have three images running on same network: zookeeper,kafka and my app. I have a simple producer in my app that goes like this:
import json
from kafka import KafkaProducer
from loguru import logger
producer = KafkaProducer(
bootstrap_servers=["kafka:9092"],
api_version=(0, 10, 21),
)
def on_send_error(excp):
logger.error(f"I am an errback {excp}", exc_info=excp)
# handle exception
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def produce_to_kafka():
print(producer.bootstrap_connected())
data = json.dumps({"message": f"Produzindo para o Kafka mensagem 1"})
producer.send("test_producer", value=data.encode()).add_callback(
on_send_success
).add_errback(on_send_error)
producer.flush()
When I try to run this code segment inside my app it prints True
in the bootstrap_connected
function but then gives me the following error when calling send()
:
ERROR | apps.my_app.producer:on_send_error:12 I am an errback KafkaTimeoutError: Batch for TopicPartition(topic='test_producer', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time
My compose
setup is the following:
services:
myapp:
image: <my_app:latest>
build:
context: ..
dockerfile: docker/Dockerfile
args:
DEPLOYMENT_MODE: develop
stop_signal: SIGINT
environment:
ports:
- "8000:8000"
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Can someone help me please?
I tried changing the hosts on docker-compose.yml
to many different communication but nothing worked.