0

I've dockerize kafka using wurstmeister/kafka image. I'm new to Kafka and integrating with my django project. I've created a management commands for consumer.py and producer.py.

docker-compose.yml

services:
   zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"

    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "topic_test:1:1"
      restart: unless-stopped
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

consumer.py

import logging

from django.conf import settings
from django.core.management import BaseCommand

from kafka import KafkaConsumer
from json import loads
from time import sleep

logger = logging.getLogger(__name__)


class Command(BaseCommand):

    def handle(self, *args, **options):
        print("Loading Consumer Kafka......")
        consumer = KafkaConsumer(
            'topic_test',
            bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='my-group-id',
            value_deserializer=lambda x: loads(x.decode('utf-8')),
            api_version=(0, 11, 5),
        )

        for event in consumer:
            event_data = event.value
            logger.info(f'Data Received on Kafka .... .: {event_data}')
            print(f'Data Received on Kafka .... .: {event_data} \n\n')
            sleep(1)

producer.py

import logging
from time import sleep
from json import dumps

from django.conf import settings
from django.core.management import BaseCommand
from kafka import KafkaProducer

logger = logging.getLogger(__name__)


class Command(BaseCommand):
    def on_send_success(self, record_metadata):
        print(record_metadata.topic)
        print(record_metadata.partition)
        print(record_metadata.offset)

    def on_send_error(self, err):
        logger.error(f'Failed to Send Message on Kafka Consumer : {err}')
        # handle exception

    def handle(self, *args, **options):
        print(f'Sending Kafka Data .... {settings.KAFKA_HOST}:{settings.KAFKA_PORT}')
        producer = KafkaProducer(
            bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
            value_serializer=lambda x: dumps(x).encode('utf-8'),
            api_version=(0, 11, 5),
        )

        for _ in range(10):
            data = {'data': 'Hello'}
            producer.send(f'{settings.KAFKA_TOPIC}', value=data).add_callback(
                self.on_send_success).add_errback(self.on_send_error)
            logger.info(f'Sending Kafka Data .... {data}')
            print(f'Sending Kafka Data .... {data}\n\n')
            producer.flush()
            sleep(0.5)

Consider these variables values that I used in my settings.

settings.KAFKA_TOPIC = 'test_topic'
settings.KAFKA_HOST = 'kafka'
settings.KAFKA_PORT = '29092'
  • docker-compose up zookeeper
  • docker-compose up kafka

Then I run the cmd. for producer. python manage.py producer

I got this error on producer logs,

enter image description here

** Produced messages to topic-partition TopicPartition(topic='topic_test', partition=0) with base offset -1 log start offset None and error None. Failed to Send Message on Kafka Consumer : KafkaTimeoutError: Batch for TopicPartition(topic='topic_test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time Expired 1 batches in accumulator KafkaTimeoutError: Batch for TopicPartition(topic='topic_test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time **

M Usman Wahab
  • 53
  • 1
  • 10
  • Please remove `KAFKA_ADVERTISED_HOST_NAME` and try again connecting to `kafka:9092` (assuming your code runs in a container itself). Otherwise, you need to connect to `localhost:29092` outside of Docker, and remove `- "9092:9092"` since it does nothing useful outside of the container. – OneCricketeer Sep 13 '22 at 19:37

0 Answers0