I am running the below code in docker container to create producer and to send message to particular topic but getting the error "kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.".
docker-compose.yml
version: "3.9"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
container_name: zookeeper3
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
container_name: kakfa3
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
python:
container_name: python3
build: .
ports:
- '5000:5000'
depends_on:
- kafka
Python code:
from kafka import KafkaProducer
import csv
import json
#Creating producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda m: json.dumps(m).encode('utf-8'))
#Declaring variable to store json message
jsonArray =[]
#Reading csv file and append data as json
with open('sample.csv', 'r') as file:
csvReader = csv.DictReader(file)
for row in csvReader:
#add this python dict to json array
jsonArray.append(row)
#Send message to topic one by one
for i in range(len(jsonArray)):
producer.send('part1',jsonArray[i])
producer.flush()