1

I'm running Kafka in docker and I've a .NET application that I want to use to consume messages. I've followed following tutorials with no luck:
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
Connect to Kafka running in Docker
Interact with kafka docker container from outside of docker host
On my consumer application I get the following error if I try to conenct directly to containers ip:

172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21502ms in state CONNECT)
Error: 1/1 brokers are down %3|1620652406.633|FAIL|rdkafka#consumer-1| [thrd:172.21.0.3:9092/bootstrap]: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed) 
Error: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed)

If I change BootstrapServers to kafka:9092 I get this error:

Error: kafka:9092/bootstrap: Failed to resolve 'kafka:9092': No such host is known.  (after 6817ms in state CONNECT, 7 identical error(s) suppressed)

My docker compose:

version: '3.8'
services:
  zookeeper:
    #image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
    image: "confluentinc/cp-zookeeper:5.5.0"
  #  ports:
  #    - 2181:2181
  #    - 2888:2888
  #    - 3888:3888
    ports:
      - 2181:2181
    environment: 
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    #image: "debezium/kafka:${DEBEZIUM_VERSION}"
    image: "confluentinc/cp-kafka"
    ports:
      - 9092:9092
      #- 29092:29092
    depends_on:
      - zookeeper
    environment:
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
     - KAFKA_BROKERID=1
     - ALLOW_PLAINTEXT_LISTENER="yes"
     - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
     - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

  connect:
    image: "debezium/connect:${DEBEZIUM_VERSION}"
    ports: 
      - 8083:8083
    depends_on:
      - kafka
      - zookeeper
    environment:
      - BOOTSTRAP_SERVERS=kafka:29092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_source_connect_statuses

  kafdrop:
    image: "obsidiandynamics/kafdrop"
    ports:
      - 9000:9000
    depends_on: 
      - connect
    environment: 
      - KAFKA_BROKERCONNECT=kafka:29092

and C# code:

 var config = new ConsumerConfig
            {
                BootstrapServers = "kafka:9092",
                GroupId = "simple-dotnet-consumer",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnablePartitionEof = true
            };
            using var consumer = new ConsumerBuilder<string, string>(config)
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                .Build();

            consumer.Subscribe(new List<string>() { "DESKTOP-DBA3LAO.dbo.CashRegister" });
            var start = DateTime.Now;
            long messageCounter = 0;
            try
            {
                while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q))
                {
                    var result = consumer.Consume(TimeSpan.FromMilliseconds(100));
                    if (result == null) { continue; }
                    if (result.IsPartitionEOF) { break; }

                    ++messageCounter;
                    if (messageCounter % 1024 == 0) { Console.WriteLine($"Received message key: \"{result.Message.Key}\" value: {result.Message.Value}"); }
                }
            }
            catch (OperationCanceledException) { }

            consumer.Close();  // commit offset and unsubscribe

            var elapsed = DateTime.Now - start;
            Console.WriteLine("average throughput: {0:N3} msg/sec, {1} messages over {2:N3} sec", messageCounter / elapsed.TotalSeconds, messageCounter, elapsed.TotalSeconds);
        
  • You asked a similar question over here: https://stackoverflow.com/questions/67433139/connect-to-kafka-running-in-docker-from-host-machine :) Did you try it with the `confluentinc/cp-kafka` images? – Robin Moffatt May 10 '21 at 15:10
  • Yes, as you can see in the new question I switched it up :) – AverageProgrammer May 10 '21 at 16:55
  • I'm going to keep marking this as a duplicate like your last question. Your code is not inside a container. Don't use `kafka:9092` as the connection string unless it is. And as mentioned in my previous comment, exec into the container and look what addresses are actually advertised if you want to debug (hint: two advertised listeners with the same hostname is not correct, and that configuration is not stated in the posts you've linked as a valid config ) – OneCricketeer May 11 '21 at 02:32
  • Specifically, see the section in the duplicate titled "Client on same machine, not in a container", and refer to the variables mentioned above it – OneCricketeer May 11 '21 at 02:40

2 Answers2

5

You need to configure the listeners correctly. At the moment you are advertising the broker as being accessible only at hostname kafka which as @mm8 says is only going to be valid within the Docker network.

     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092

You need to have a listener which will advertise itself as localhost (or whatever hostname your code will be able to connect to it on), for example:

     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
     - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092

Now amend your client application to use localhost and not kafka:

BootstrapServers = "localhost:9092",

This is all covered in my blog, if you read it carefully :)

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
1

If you are running your consuming .NET app outside of Docker, you should try to connect to localhost:9092. The kafka hostname is only valid in Docker.

You'll find an example of how to run Kafka in Docker and consume the messages from it using a .NET Core app here.

You could compare the docker-compose.yml from that example with yours.

Here is how the the .NET Core app sets up the consumer:

using IConsumer<Null, YourMessageType> consumer =
    new ConsumerBuilder<Null, YourMessageType>(
        new ConsumerConfig
        {
            GroupId = Guid.NewGuid().ToString(),
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest
        })
        .SetValueDeserializer(new AvroDeserializer<YourMessageType>(schemaRegistry).AsSyncOverAsync())
        .Build();
mm8
  • 163,881
  • 10
  • 57
  • 88