0

I have a kafka producer written in python that I have added to docker-compose.yml

Producer:

import os, csv, avro.schema, json
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from kafka import KafkaProducer
from collections import namedtuple

output_loc = '{}/avro.avro'.format(os.path.dirname(__file__))
CSV = '{}/oscar_age_male.csv'.format(os.path.dirname(__file__))
fields = ("Index","Year", "Age", "Name", "Movie")
csv_record = namedtuple('csv_record', fields)

p = KafkaProducer(bootstrap_servers = ['localhost:9092', 'kafka:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))

def read_csv(path):
    with open(path, 'rU') as data:
        data.readline()
        reader = csv.reader(data, delimiter=",")
        for row in map(csv_record._make, reader):
            yield row

def parse_schema(path='{}/schema.avsc'.format(os.path.dirname(__file__))):
    with open(path, 'r') as data:
        return avro.schema.parse(data.read())

def serilialise_records_and_send(records, outpath=output_loc):
    schema = parse_schema()
    with open(outpath, 'w') as out:
        writer = DataFileWriter(out, DatumWriter(), schema)
        for record in records:
            record = dict((f, getattr(record, f)) for f in record._fields)
            writer.append(record)
            msg = p.send(topic='test', value=record)
            metadt = msg.get()
            print(metadt.topic)
            print(metadt.partition)

serilialise_records_and_send(read_csv(CSV))

when I run the docker-compose my producer image fails due to no brokers available.

can anyone enlighten me as to why the brokers aren't available?

when I run the producer locally from IDE I can connect so unsure what's missing

docker-compose.yml

version: '2'
services:
  zookeeper:
    image: "confluentinc/cp-zookeeper:5.4.0"
    hostname: zookeeper
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    extra_hosts:
      - "moby:127.0.0.1"  
      
  kafka:
    image: "confluentinc/cp-enterprise-kafka:5.4.0"
    hostname: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:32181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    extra_hosts:
      - "moby:127.0.0.1"  
      
  schema-registry:
    image: "confluentinc/cp-schema-registry:latest"
    hostname: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - '8081:8081'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: GET,POST,PUT,OPTIONS
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*' 
    extra_hosts:
      - "moby:127.0.0.1"

      
  kafdrop:
    image: "obsidiandynamics/kafdrop"
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka:29092
      JVM_OPTS: "-Xms32M -Xmx64M"
      SERVER_SERVLET_CONTEXTPATH: "/"
    depends_on:
      - kafka
  
  producer:
    image: "producer"
    ports: 
      - '5000:5000'
    environment: 
      KAFKA_BROKERCONNECT: kafka:29092
    depends_on: 
      - kafka
Harvey
  • 668
  • 2
  • 7
  • 15

2 Answers2

1

You're hardcoding the broker host and port

p = KafkaProducer(bootstrap_servers = ['localhost:9092', 'kafka:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))

but when running under Docker you need to connect to the listener that is correct for that network, namely kafka:29092 (which you have included at KAFKA_BROKERCONNECT but I don't see it being read in your code).

So either update your code to use the environment variable, or change your hardcoded list to

p = KafkaProducer(bootstrap_servers = ['localhost:9092', 'kafka:29092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))

See this blog for thorough background on Kafka listeners, Docker, etc.

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
0

You've defined an environment variable for Kafka. You should use it

import os
p = KafkaProducer(bootstrap_servers = [os.environ['KAFKA_BROKERCONNECT']])

You cannot use localhost while the code runs within the container because that refers to that service, not the broker


However, the more generalized solution to the code you've shown is to use the CSV SpoolDir Connector

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245