I'm running Kafka in docker and I've a .NET application that I want to use to consume messages. I've followed following tutorials with no luck:
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
Connect to Kafka running in Docker
Interact with kafka docker container from outside of docker host
On my consumer application I get the following error if I try to conenct directly to containers ip:
172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21502ms in state CONNECT)
Error: 1/1 brokers are down %3|1620652406.633|FAIL|rdkafka#consumer-1| [thrd:172.21.0.3:9092/bootstrap]: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed)
Error: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed)
If I change BootstrapServers to kafka:9092 I get this error:
Error: kafka:9092/bootstrap: Failed to resolve 'kafka:9092': No such host is known. (after 6817ms in state CONNECT, 7 identical error(s) suppressed)
My docker compose:
version: '3.8'
services:
zookeeper:
#image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
image: "confluentinc/cp-zookeeper:5.5.0"
# ports:
# - 2181:2181
# - 2888:2888
# - 3888:3888
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
#image: "debezium/kafka:${DEBEZIUM_VERSION}"
image: "confluentinc/cp-kafka"
ports:
- 9092:9092
#- 29092:29092
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKERID=1
- ALLOW_PLAINTEXT_LISTENER="yes"
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
- KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
connect:
image: "debezium/connect:${DEBEZIUM_VERSION}"
ports:
- 8083:8083
depends_on:
- kafka
- zookeeper
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
kafdrop:
image: "obsidiandynamics/kafdrop"
ports:
- 9000:9000
depends_on:
- connect
environment:
- KAFKA_BROKERCONNECT=kafka:29092
and C# code:
var config = new ConsumerConfig
{
BootstrapServers = "kafka:9092",
GroupId = "simple-dotnet-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
using var consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
consumer.Subscribe(new List<string>() { "DESKTOP-DBA3LAO.dbo.CashRegister" });
var start = DateTime.Now;
long messageCounter = 0;
try
{
while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q))
{
var result = consumer.Consume(TimeSpan.FromMilliseconds(100));
if (result == null) { continue; }
if (result.IsPartitionEOF) { break; }
++messageCounter;
if (messageCounter % 1024 == 0) { Console.WriteLine($"Received message key: \"{result.Message.Key}\" value: {result.Message.Value}"); }
}
}
catch (OperationCanceledException) { }
consumer.Close(); // commit offset and unsubscribe
var elapsed = DateTime.Now - start;
Console.WriteLine("average throughput: {0:N3} msg/sec, {1} messages over {2:N3} sec", messageCounter / elapsed.TotalSeconds, messageCounter, elapsed.TotalSeconds);