0

I am using the docker-compose to create 2 containers.

  1. Kafka
  2. Pyspark

Am performing a POC using Python producer inside Pyspark --> Kafka docker but am getting the error kafka.errors.NoBrokersAvailable: NoBrokersAvailable.

version: '3'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2182:2181"

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  pyspark:
    image: jupyter/pyspark-notebook:latest
    container_name: pyspark
    ports:
      - "8888:8888"
      - "4040:4040"   
      - "4141:4141"
      - "4242:4242"

    volumes:
      - ./spark:/home/jovyan/work
    environment:
      JUPYTER_ENABLE_LAB: "yes"
    
    depends_on: 
      - kafka

#data-generator.py
import random
import string


user_ids = list(range(1, 101))
recipient_ids = list(range(1,101))

def generate_message() -> dict:
    random_user_id = random.choice(user_ids)

    # Copy the recipients array 
    recipient_ids_copy = recipient_ids.copy()

    # User can't send message to himself
    recipient_ids_copy.remove(random_user_id)
    random_recipient_id = random.choice(recipient_ids_copy)

    # Generate a random message 
    message = ''.join(random.choice(string.ascii_letters) for i in range(32))

    return {
        'user_id' : random_user_id,
        'recipient_id' : random_recipient_id,
        'message' : message
    }

#Testing 

# if __name__ == '__main__':
#     print(generate_message())
   
#producer.py
import time
import json 
import random
from datetime import datetime
from data_generator import generate_message
from kafka import KafkaProducer

#Messages will be serialized as JSON 
def serializer(message):
    return json.dumps(message).encode('utf-8')

#Kafka Producer
producer = KafkaProducer(
    bootstrap_servers = ['localhost:9092'],
    value_serializer = serializer
                        )

if __name__ == '__main__':
    #Infinite loop - runs until you kill the program
    
    while True:
        # Generate a message
        dummy_message = generate_message()
        
        #Send it to our 'messages'
        print(f'Producing message @ {datetime.now()} | Message = {str(dummy_message)}')
        ack = producer.send('messages', dummy_message)
        metadata = ack.get()
        # print(metadata)
        #Sleep for a random number of seconds
        time_to_sleep = random.randint(1, 11)
        time.sleep(time_to_sleep)

Tried multiple approaches

  1. bootstrap_servers = ['localhost:9092']
  2. bootstrap_servers = ['kafka:9092']
  3. bootstrap_servers = ['ip:9092'] using an IP retrieved from docker inspect for Kafka results in timeout
  4. telnet from Pyspark container results in connection refuse.
  5. Added depends_on : Kafka

The Kafka producer works absolutely fine from the local to Kafka container. Please find the code snippet below for docker-compose & producer.py

Thanks

  • Does this answer your question? [Kafka access inside and outside docker](https://stackoverflow.com/questions/53247553/kafka-access-inside-and-outside-docker) – daniu Jan 27 '23 at 18:38

0 Answers0