0

Setting Up

I am using the confluent/kafka images from docker hub to start the zookeeper and the kafka instances in two separate containers. The commands I have used to start the containers are as follows:

docker run --rm --name zookeeper -p 2181:2181 confluent/zookeeper
docker run --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper confluent/kafka

And I have two containers zookeeper and kafka running now.

Note that I have mapped ports 2181 and 9092 of the containers to my host machine ports. I verified that this mapping is working by just trying localhost:2181/9092 in my browser and I get some errors printed in my running containers' terminals.

Then I created topic by issuing the following command in my host machine:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

This is successful and I verified it by listing the topics with the following command:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

Now the ISSUE:

I am trying to produce some messages to the broker with the following command:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

I am getting the following exception:

[2017-03-02 20:36:02,376] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:594)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

I read some threads on the internet that suggested that I update my hosts file. If so what entry do I have to put in my hosts file??

Also some threads suggested I set the ADVERTISED_HOST entry to correct IP in the configuration file. Which configuration file??? Where do I make the update?

If it's the server.properties file used for the kafka broker then I did try going into the container created by the confluent/kafka image. It looks like this:

socket.send.buffer.bytes=102400
delete.topic.enable=true
socket.request.max.bytes=104857600
log.cleaner.enable=true
log.retention.check.interval.ms=300000
log.retention.hours=168
num.io.threads=8
broker.id=0
log4j.opts=-Dlog4j.configuration\=file\:/etc/kafka/log4j.properties
log.dirs=/var/lib/kafka
auto.create.topics.enable=true
num.network.threads=3
socket.receive.buffer.bytes=102400
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=1
num.partitions=1
zookeeper.connection.timeout.ms=6000
zookeeper.connect=zookeeper\:2181

Any suggestions how I can overcome this and resolve producing and consuming from the kafka containers possible from my host machine??

Thanks Alot!!!

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Shabirmean
  • 2,341
  • 4
  • 21
  • 34
  • Possible duplicate of [Connect to Kafka running in Docker from local machine](https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker-from-local-machine) – OneCricketeer Sep 27 '18 at 21:36

1 Answers1

0

I was able to figure it out within seconds of posting this question.

I had to get the HOSTNAME of the container in which the broker was running by issuing:

echo $HOSTNAME

And I updated my /etc/hosts file in my host machine with the loopback entry:

127.0.0.1     KAFKA_CONTAINER_HOSTNAME
127.0.0.1     ZOOKEEPER_CONTAINER_HOSTNAME

Had to do the same with the zookeeper container in order for the consumer also to work without an issue.

Cheers!!!

Shabirmean
  • 2,341
  • 4
  • 21
  • 34