1

I have a Nodejs microservice and a Kafka broker running in the same cluster.

The kafka broker and zookeeper are running without errors, but I am not sure how to connect to them.

kafka.yaml

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-service
  namespace: kafka
spec:
  ports:
  - port: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          # value: 10.244.0.35:2181
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        # - name: KAFKA_ADVERTISED_HOST_NAME
        #   value: kafka-broker
        # - name: KAFKA_ADVERTISED_PORT
        #   value: "9092"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092

source

Connecting using kafka-service:9092 or kafka-broker:9092 doesn't work and leads to a timeout.

kafka.js

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['PLAINTEXT://kafka-broker:9092'], // !!! connection string
})

async function createProducer() {
  const producer = kafka.producer()

  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [{ value: 'Hello KafkaJS user!' }],
  })

  await producer.disconnect()
}

createProducer()
[auth-pod] {"level":"WARN","timestamp":"2023-03-24T15:35:41.511Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
[auth-pod] Listening on port 3000...
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:41.586Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":0,"retryTime":292}     
[auth-pod] Connected to: mongodb://auth-mongo-srv:27017/auth
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:41.881Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":1,"retryTime":596}     
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:42.479Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":2,"retryTime":1184}    
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:43.665Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":3,"retryTime":2782}    
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:46.449Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":4,"retryTime":5562}    
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:52.015Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":5,"retryTime":12506}   
[auth-pod] node:internal/process/promises:288
[auth-pod]             triggerUncaughtException(err, true /* fromPromise */);
[auth-pod]             ^
[auth-pod]
[auth-pod] KafkaJSNonRetriableError
[auth-pod]   Caused by: KafkaJSConnectionError: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).
[auth-pod]     at /app/node_modules/kafkajs/src/network/connection.js:254:11
[auth-pod]     ... 8 lines matching cause stack trace ...
[auth-pod]     at async createProducer (/app/src/kakfka/connect.js:11:3) {
[auth-pod]   name: 'KafkaJSNumberOfRetriesExceeded',
[auth-pod]   retriable: false,
[auth-pod]   helpUrl: undefined,
[auth-pod]   retryCount: 5,
[auth-pod]   retryTime: 12506,
[auth-pod]   [cause]: KafkaJSConnectionError: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).
[auth-pod]       at /app/node_modules/kafkajs/src/network/connection.js:254:11
[auth-pod]       at new Promise (<anonymous>)
[auth-pod]       at Connection.connect (/app/node_modules/kafkajs/src/network/connection.js:167:12)
[auth-pod]       at ConnectionPool.getConnection (/app/node_modules/kafkajs/src/network/connectionPool.js:56:24)
[auth-pod]       at Broker.connect (/app/node_modules/kafkajs/src/broker/index.js:86:52)
[auth-pod]       at async /app/node_modules/kafkajs/src/cluster/brokerPool.js:93:9
[auth-pod]       at async /app/node_modules/kafkajs/src/cluster/index.js:107:14
[auth-pod]       at async Cluster.connect (/app/node_modules/kafkajs/src/cluster/index.js:146:5)
[auth-pod]       at async Object.connect (/app/node_modules/kafkajs/src/producer/index.js:219:7)
[auth-pod]       at async createProducer (/app/src/kakfka/connect.js:11:3) {
[auth-pod]     retriable: true,
[auth-pod]     helpUrl: undefined,
[auth-pod]     broker: 'PLAINTEXT:NaN',
[auth-pod]     code: undefined,
[auth-pod]     [cause]: undefined
[auth-pod]   }
[auth-pod] }
[auth-pod]
[auth-pod] Node.js v18.15.0

If I use the IP of the pod kafka-broker-5c7f7d4f77-nxlwm directly brokers: ['10.244.0.94:9092'], I also get an error. Using the default namespace instead of a separate namespace didn't make a difference.

After switching to a StatefulSet based on this answer, I can connect using the IP of kafka-broker-0 '10.244.0.110:9092', but I get another error: KafkaJSProtocolError: Replication-factor is invalid. I don't know why the dns resolution would fail, but using the name 'kafka-broker-0:9092', leads to the same error as before "[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout".

Based on

If you have multiple REST Proxy pods running, Kubernetes will route the traffic to one of them. source

I should be able to use the Kubernetes service kafka-service to load balance requests without hard coding an IP address. (There wasn't a targetPort, but it still doesn't work after adding targetPort: 9092, although I am not sure which protocol to use)


I looked at the logs of the kafka-broker pod and noticed an exception.

[2023-03-24 18:01:25,123] WARN [Controller id=1, targetBrokerId=1] Error connecting to node kafka-broker:9092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka-broker
 at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)
 at java.base/java.net.InetAddress.getAllByName0(Unknown Source)
 at java.base/java.net.InetAddress.getAllByName(Unknown Source)
 at java.base/java.net.InetAddress.getAllByName(Unknown Source)
 at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
 at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)
 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:513)
 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
 at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)
 at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)
 at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
 at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
 at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
 at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

I think that specifying KAFKA_ADVERTISED_LISTENERS should be sufficient (answer), so I am guessing there is a problem with dns resolution.

Using a headless service by adding clusterIP: "None" and changing the name to kafka-broker in case that PLAINTEXT://kafka-broker:9092 uses the service and not the deployment didn't help.

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  clusterIP: "None"
  # ports:
  # - protocol: TCP
  #   port: 9092
  #   targetPort: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  # replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          # value: 10.244.0.35:2181
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092

full code

Edit: Not sure why I had a KafkaJSProtocolError: Replication-factor is invalid error, but changing the service as follows prevents it. (It might be because I was using the same name for the service and deployment. I don't fully understand headless services, but I also added a port.)

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  # type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      # nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-srv
  name: kafka-srv
  namespace: kafka
spec:
  # headless service
  clusterIP: "None"
  ports:
  - name: foo
    port: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  # replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['10.244.0.64:9092'],
})

async function createProducer() {
  const producer = kafka.producer()

  try {
    await producer.connect()
    console.log('connected', producer)
    // await producer.send({
    //   topic: 'test-topic',
    //   messages: [{ value: 'Hello KafkaJS user!' }],
    // })

    // await producer.disconnect()
  } catch (err) {
    console.log("Couldn' connect to broker")
    console.error(err)
  }
}
[auth-pod] connected {
[auth-pod]   connect: [AsyncFunction: connect],
[auth-pod]   disconnect: [AsyncFunction: disconnect],
[auth-pod]   isIdempotent: [Function: isIdempotent],
[auth-pod]   events: {
[auth-pod]     CONNECT: 'producer.connect',
[auth-pod]     DISCONNECT: 'producer.disconnect',
[auth-pod]     REQUEST: 'producer.network.request',
[auth-pod]     REQUEST_TIMEOUT: 'producer.network.request_timeout',
[auth-pod]     REQUEST_QUEUE_SIZE: 'producer.network.request_queue_size'
[auth-pod]   },
[auth-pod]   on: [Function: on],
[auth-pod]   send: [AsyncFunction: send],
[auth-pod]   sendBatch: [AsyncFunction: sendBatch],
[auth-pod]   transaction: [AsyncFunction: transaction],
[auth-pod]   logger: [Function: getLogger]
[auth-pod] }

Edit 2: When connecting successfully using the ip address, I also get java.net.UnknownHostException: kafka-broker in the kafka-broker-0 pod. (The error keeps repeating as well) I thought that the pod was being reached by kafkajs and then the pod threw an error, but the error happens regardless. Matching the service name with the advertised host name prevents it.

---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  # namespace: kafka
spec:
  # headless service
  clusterIP: "None"
  ports:
  - name: foo
    port: 9092
  selector:
    app: kafka-broker

I can't connect to the pod kafka-broker-0:9092 directly, but now using the service name kafka-broker:9092 works.

BPDev
  • 397
  • 1
  • 9

2 Answers2

1

From outside k8s, your app cannot resolve cluster DNS names. You'll need to use an ClusterIP / NodePort address (but you've set that to None). This will also require you to advertise that. Refer https://strimzi.io/blog/2019/04/17/accessing-kafka-part-1/

  1. Never use IPs for services. This includes Zookeeper connect property for Kafka. Ref. https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/

  2. I highly recommend not writing your own Kafka/Zookeeper specs. Use a Helm Chart or Operator such as https://strimzi.io - mentioned in blog above

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • "From outside k8s" Everything is in the same cluster, for example I can connect to a database using the service name `mongodb://auth-mongo-srv:27017/auth`. It didn't work even before setting the cluster ip to None, but I was trying [this](https://stackoverflow.com/a/57261043/20898396) answer which said to use a headless service. – BPDev Mar 25 '23 at 22:58
  • 1
    That answer completely depends what you set `KAFKA_ADVERTISED_LISTENERS` as. Headless service will not magically allow you to connect – OneCricketeer Mar 25 '23 at 23:00
  • 1
    In any case, `UnknownHostException` is a DNS error, not specific to Kubernetes or Kafkajs. So, refer to above k8s docs how to construct a proper DNS name for a service or pod – OneCricketeer Mar 25 '23 at 23:07
0

I still don't fully understand headless services, but using the following configuration, I can connect using the service name 'kafka-broker.kafka.svc.cluster.local:9092'. Namespaces and DNS

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  # type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      # nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  # headless service
  clusterIP: "None"
  ports:
  - name: foo
    port: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  # replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092
# TODO add persistent volume

Edit: there is an issue when using namespaces

BPDev
  • 397
  • 1
  • 9