I've dockerize kafka using wurstmeister/kafka
image. I'm new to Kafka and integrating with my django project. I've created a management commands for consumer.py and producer.py.
docker-compose.yml
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:29092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic_test:1:1"
restart: unless-stopped
volumes:
- /var/run/docker.sock:/var/run/docker.sock
consumer.py
import logging
from django.conf import settings
from django.core.management import BaseCommand
from kafka import KafkaConsumer
from json import loads
from time import sleep
logger = logging.getLogger(__name__)
class Command(BaseCommand):
def handle(self, *args, **options):
print("Loading Consumer Kafka......")
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8')),
api_version=(0, 11, 5),
)
for event in consumer:
event_data = event.value
logger.info(f'Data Received on Kafka .... .: {event_data}')
print(f'Data Received on Kafka .... .: {event_data} \n\n')
sleep(1)
producer.py
import logging
from time import sleep
from json import dumps
from django.conf import settings
from django.core.management import BaseCommand
from kafka import KafkaProducer
logger = logging.getLogger(__name__)
class Command(BaseCommand):
def on_send_success(self, record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(self, err):
logger.error(f'Failed to Send Message on Kafka Consumer : {err}')
# handle exception
def handle(self, *args, **options):
print(f'Sending Kafka Data .... {settings.KAFKA_HOST}:{settings.KAFKA_PORT}')
producer = KafkaProducer(
bootstrap_servers=[f'{settings.KAFKA_HOST}:{settings.KAFKA_PORT}'],
value_serializer=lambda x: dumps(x).encode('utf-8'),
api_version=(0, 11, 5),
)
for _ in range(10):
data = {'data': 'Hello'}
producer.send(f'{settings.KAFKA_TOPIC}', value=data).add_callback(
self.on_send_success).add_errback(self.on_send_error)
logger.info(f'Sending Kafka Data .... {data}')
print(f'Sending Kafka Data .... {data}\n\n')
producer.flush()
sleep(0.5)
Consider these variables values that I used in my settings.
settings.KAFKA_TOPIC = 'test_topic'
settings.KAFKA_HOST = 'kafka'
settings.KAFKA_PORT = '29092'
docker-compose up zookeeper
docker-compose up kafka
Then I run the cmd. for producer.
python manage.py producer
I got this error on producer logs,
** Produced messages to topic-partition TopicPartition(topic='topic_test', partition=0) with base offset -1 log start offset None and error None. Failed to Send Message on Kafka Consumer : KafkaTimeoutError: Batch for TopicPartition(topic='topic_test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time Expired 1 batches in accumulator KafkaTimeoutError: Batch for TopicPartition(topic='topic_test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time **