I have a kafka producer written in python that I have added to docker-compose.yml
Producer:
import os, csv, avro.schema, json
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from kafka import KafkaProducer
from collections import namedtuple
output_loc = '{}/avro.avro'.format(os.path.dirname(__file__))
CSV = '{}/oscar_age_male.csv'.format(os.path.dirname(__file__))
fields = ("Index","Year", "Age", "Name", "Movie")
csv_record = namedtuple('csv_record', fields)
p = KafkaProducer(bootstrap_servers = ['localhost:9092', 'kafka:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
def read_csv(path):
with open(path, 'rU') as data:
data.readline()
reader = csv.reader(data, delimiter=",")
for row in map(csv_record._make, reader):
yield row
def parse_schema(path='{}/schema.avsc'.format(os.path.dirname(__file__))):
with open(path, 'r') as data:
return avro.schema.parse(data.read())
def serilialise_records_and_send(records, outpath=output_loc):
schema = parse_schema()
with open(outpath, 'w') as out:
writer = DataFileWriter(out, DatumWriter(), schema)
for record in records:
record = dict((f, getattr(record, f)) for f in record._fields)
writer.append(record)
msg = p.send(topic='test', value=record)
metadt = msg.get()
print(metadt.topic)
print(metadt.partition)
serilialise_records_and_send(read_csv(CSV))
when I run the docker-compose my producer image fails due to no brokers available.
can anyone enlighten me as to why the brokers aren't available?
when I run the producer locally from IDE I can connect so unsure what's missing
docker-compose.yml
version: '2'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.4.0"
hostname: zookeeper
ports:
- '32181:32181'
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
extra_hosts:
- "moby:127.0.0.1"
kafka:
image: "confluentinc/cp-enterprise-kafka:5.4.0"
hostname: kafka
ports:
- '9092:9092'
- '29092:29092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:32181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
extra_hosts:
- "moby:127.0.0.1"
schema-registry:
image: "confluentinc/cp-schema-registry:latest"
hostname: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- '8081:8081'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: GET,POST,PUT,OPTIONS
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
extra_hosts:
- "moby:127.0.0.1"
kafdrop:
image: "obsidiandynamics/kafdrop"
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka:29092
JVM_OPTS: "-Xms32M -Xmx64M"
SERVER_SERVLET_CONTEXTPATH: "/"
depends_on:
- kafka
producer:
image: "producer"
ports:
- '5000:5000'
environment:
KAFKA_BROKERCONNECT: kafka:29092
depends_on:
- kafka