0

Although there are number of similar questions on stackoverflow, none of the recommendations helped. I am new to kafka to understand what exactly is wrong.

So here is my kafka setup.

version: "3.9"
services:
  zookeeper:
    image: "bitnami/zookeeper:latest"
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - "pocnetwork"

  kafka:
    image: bitnami/kafka:2.8.1
    ports:
      - "9092:9092"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
    depends_on:
      - zookeeper
    networks:
      - "pocnetwork"

networks:
  pocnetwork:

I did not define any specific IP for zookeeper or kafka, but at lest producer and consumer can connect to kafka. I did not also perform any group or partition configuration.

Producer and consumer are basically ones from a tutorial I found on the internet

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['192.168.10.140:9092'],
                         api_version=(0,11,5),
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

for e in range(1000):
    data = {'number' : e}
    print(f"Sending {e}")
    producer.send('numtest', value=data)
    print(f"Sent {e}")
    sleep(1)

Consumer

from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads


consumer = KafkaConsumer(
    'numtest',
     bootstrap_servers=['192.168.10.140:9092'],
#     api_version=(0,11,5),
     auto_offset_reset='earliest',
#     auto_commit_enable = False,
     enable_auto_commit=True,
     auto_commit_interval_ms=100,
     group_id='my-group',
#     group_id=None,
     consumer_timeout_ms=1000,
     value_deserializer=lambda x: loads(x.decode('utf-8')))


print("Start consuming")
consumer.poll()
for message in consumer:
    print(f'Received message {message.value}')

Does someone have any ideas what is wrong with this setup or code? I am pretty sure this is a stupid newbie issue, but I spent half a day fighting with this already.

  • You can try to consume the topic from the command line. https://kafka.apache.org/quickstart#quickstart_consume – kometen Oct 03 '21 at 07:17
  • How exactly do you know the code works? You're using Docker incorrectly if your Python code is on the same machine. You should not be using IP addresses at all. Secondly, refer linked post what advertised listeners actually mean. You must advertise localhost in order to connect to localhost... Secondly, you should `producer.flush()` after the loop and reset the group id you're using... If you don't understand all this, maybe try installing Kafka without Docker first to learn it – OneCricketeer Oct 03 '21 at 16:35

0 Answers0