Although there are number of similar questions on stackoverflow, none of the recommendations helped. I am new to kafka to understand what exactly is wrong.
So here is my kafka setup.
version: "3.9"
services:
zookeeper:
image: "bitnami/zookeeper:latest"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- "pocnetwork"
kafka:
image: bitnami/kafka:2.8.1
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
depends_on:
- zookeeper
networks:
- "pocnetwork"
networks:
pocnetwork:
I did not define any specific IP for zookeeper or kafka, but at lest producer and consumer can connect to kafka. I did not also perform any group or partition configuration.
Producer and consumer are basically ones from a tutorial I found on the internet
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['192.168.10.140:9092'],
api_version=(0,11,5),
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
print(f"Sending {e}")
producer.send('numtest', value=data)
print(f"Sent {e}")
sleep(1)
Consumer
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads
consumer = KafkaConsumer(
'numtest',
bootstrap_servers=['192.168.10.140:9092'],
# api_version=(0,11,5),
auto_offset_reset='earliest',
# auto_commit_enable = False,
enable_auto_commit=True,
auto_commit_interval_ms=100,
group_id='my-group',
# group_id=None,
consumer_timeout_ms=1000,
value_deserializer=lambda x: loads(x.decode('utf-8')))
print("Start consuming")
consumer.poll()
for message in consumer:
print(f'Received message {message.value}')
Does someone have any ideas what is wrong with this setup or code? I am pretty sure this is a stupid newbie issue, but I spent half a day fighting with this already.