0

I have troubles with connection to kafka from localhost. I'm using kafka with KRaft without zookeper.

services:
  kafka_auth:
    image: confluentinc/cp-kafka:7.2.1
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://kafka_auth:9092,CONTROLLER://kafka_auth:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka_auth:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka_mqtt:9093,2@kafka_auth:9093,3@kafka_server:9093'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    volumes:
      - ./run_workaround.sh:/tmp/run_workaround.sh
    command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
    depends_on:
      - kafka_mqtt
    ports:
      - "9094:9094"

  api:
    image: node:18
    volumes:
      - ./:/app
    command: sh -c "yarn install && yarn start:debug"
    depends_on:
      - kafka_mqtt

When I connect from api service it's connect without any errors, but when I try to connect outside of docker using same code on port 9094 I get an error:

{"level":"ERROR","timestamp":"2023-08-06T01:16:06.815Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":27,"retryTime":10000}

run_workaround.sh

sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
echo "kafka-storage format --ignore-formatted -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure

nodejs

import {
  Kafka,
  Consumer,
} from 'kafkajs';
const kafka = new Kafka({
    clientId: 'clientId',
    brokers: process.env.KAFKA_BOOTSTRAP_SERVERS.split(','),
    requestTimeout: 3600000,
    retry: {
      maxRetryTime: 10000,
      initialRetryTime: 10000,
      retries: 999999999,
    },
});

const consumer = kafka.consumer({ groupId: 'groupId' });
consumer.connect().then(async () => {
  // here I also check all topics and create them if they don't exist
  await consumer.subscribe({ topics: ['topic1'] });
  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      if (!message?.value) {
        return;
      }
      switch (topic) {
        case 'topic1':
          method(message.value.toString());
          break;
        default:
          break;
      }
    },
  });
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Drew Dru
  • 449
  • 4
  • 16
  • 1) Are you sure Kafka is starting okay? KAFKA_CONTROLLER_QUORUM_VOTERS has 3 entries, but you've only shown one 2) How does your code run if `process.env.KAFKA_BOOTSTRAP_SERVERS` is undefined? And you need to use port 9092 between the two containers. Please show your command that you use outside of Docker to connect – OneCricketeer Aug 07 '23 at 13:04
  • 1) Yes, I can connect to it from app services and it works fine.I omitted this code. it does not apply to the problem, but the rest of the kafka_mqtt and kafka_server services are identical. 2) inside of api KAFKA_BOOTSTRAP_SERVERS='kafka_mqtt:9092', then I run it outside of Docker KAFKA_BOOTSTRAP_SERVERS="127.0.0.1:9094" I use the same code when I run it inside and outside of container. – Drew Dru Aug 07 '23 at 15:31
  • 1
    `kafka_mqtt` is not the correct DNS name. Did you mean `kafka_auth`? (Also, Kafka is not using MQTT, and you have no "auth" enabled, so these names don't make sense to me) And it does apply to the problem if you use the incorrect address. Related - https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker – OneCricketeer Aug 07 '23 at 15:36
  • sorry for typo. sure it's kafka_auth in my code. Thank you, I will try it. – Drew Dru Aug 07 '23 at 15:42

0 Answers0