I am using the docker-compose to create 2 containers.
- Kafka
- Pyspark
Am performing a POC using Python producer inside Pyspark --> Kafka docker but am getting the error kafka.errors.NoBrokersAvailable: NoBrokersAvailable.
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2182:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
pyspark:
image: jupyter/pyspark-notebook:latest
container_name: pyspark
ports:
- "8888:8888"
- "4040:4040"
- "4141:4141"
- "4242:4242"
volumes:
- ./spark:/home/jovyan/work
environment:
JUPYTER_ENABLE_LAB: "yes"
depends_on:
- kafka
#data-generator.py
import random
import string
user_ids = list(range(1, 101))
recipient_ids = list(range(1,101))
def generate_message() -> dict:
random_user_id = random.choice(user_ids)
# Copy the recipients array
recipient_ids_copy = recipient_ids.copy()
# User can't send message to himself
recipient_ids_copy.remove(random_user_id)
random_recipient_id = random.choice(recipient_ids_copy)
# Generate a random message
message = ''.join(random.choice(string.ascii_letters) for i in range(32))
return {
'user_id' : random_user_id,
'recipient_id' : random_recipient_id,
'message' : message
}
#Testing
# if __name__ == '__main__':
# print(generate_message())
#producer.py
import time
import json
import random
from datetime import datetime
from data_generator import generate_message
from kafka import KafkaProducer
#Messages will be serialized as JSON
def serializer(message):
return json.dumps(message).encode('utf-8')
#Kafka Producer
producer = KafkaProducer(
bootstrap_servers = ['localhost:9092'],
value_serializer = serializer
)
if __name__ == '__main__':
#Infinite loop - runs until you kill the program
while True:
# Generate a message
dummy_message = generate_message()
#Send it to our 'messages'
print(f'Producing message @ {datetime.now()} | Message = {str(dummy_message)}')
ack = producer.send('messages', dummy_message)
metadata = ack.get()
# print(metadata)
#Sleep for a random number of seconds
time_to_sleep = random.randint(1, 11)
time.sleep(time_to_sleep)
Tried multiple approaches
- bootstrap_servers = ['localhost:9092']
- bootstrap_servers = ['kafka:9092']
- bootstrap_servers = ['ip:9092'] using an IP retrieved from docker inspect for Kafka results in timeout
- telnet from Pyspark container results in connection refuse.
- Added depends_on : Kafka
The Kafka producer works absolutely fine from the local to Kafka container. Please find the code snippet below for docker-compose & producer.py
Thanks