0

I'm trying to set up Kafka in Kubernetes (GKE) with pods for zookeeper, 1 broker, and 2 python apps: producer and consumer. However, I'm not able read and write to the topic correctly.

I have set up zookeeper like this:

apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:    
      - name: zookeeper  
        image: confluentinc/cp-zookeeper:7.3.0
        imagePullPolicy: IfNotPresent
        env:
        - name: ZOOKEEPER_CLIENT_PORT
          value: "2181"
        - name: ZOOKEEPER_TICK_TIME
          value: "2181"  
        ports:
        - containerPort: 2181

broker setup looks like this:

apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-service
  namespace: kafka
spec:
  ports:
  - port: 9092
  selector:
    app: kafka-broker
---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:7.3.0
        imagePullPolicy: IfNotPresent
        env:
          - name: KAFKA_ZOOKEEPER_CONNECT
            value: 10.125.1.238:2181
            #value: zookeeper:2181
          - name: KAFKA_LISTENERS
            value: INSIDE://:9092
          - name: KAFKA_ADVERTISED_LISTENERS
            value: INSIDE://:9092
          - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
            value: INSIDE:PLAINTEXT
          - name: KAFKA_INTER_BROKER_LISTENER_NAME
            value: INSIDE
        ports:
        - containerPort: 9092

The IP Address for KAFKA_ZOOKEEPER_CONNECT comes from the ip zookeeper ClusterIP. I'm not sure if this is a good or long term solution but at this point I would be happy to be able to communicate internally between the pods and using this method is the closest I have gotten.

NAME                TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)          AGE
kafka-service       ClusterIP   10.125.0.7     <none>        9092/TCP         72m
zookeeper-service   NodePort    10.125.1.238   <none>        2181:30181/TCP   75m

Inside the broker I have setup a topic like this:

kubectl exec --stdin --tty kafka-broker-54bdd9bdcd-xjz47 -- /bin/bash
kafka-topics --create --bootstrap-server 10.125.0.7:9092 --topic messages

Additionally I have a deployment of two Ubuntu (22.04) based containers with Python (3.9), Kafka (2.13-3.3.1) and Kafkacat installed for debugging and development:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: python-deployment
  labels:
    app: python-agent
  namespace: kafka
spec:
  replicas: 2
  template: 
    metadata:
      name: python-agent
      labels:
        app: python-agent
    spec:
      containers:
        - name: python-container
          image: gcr.io/xx-xx-xx/ubuntu_kafka_python:v1
          imagePullPolicy: Always       
  selector:
    matchLabels:
      app: python-agent

On another of the python pods I'm trying to run a producer: (I'm using interactive shells inside each pod to manually run these)

import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='10.125.0.7:9092')

if __name__ == '__main__':
    while True:
        producer.send(
            topic="messages",
            key=b"test",
            value=b"Hello"
        )
        time.sleep(1)

And another of the python pods has a consumer script:

from kafka import KafkaConsumer


if __name__ == '__main__':
    consumer = KafkaConsumer('messages', bootstrap_servers='10.125.0.7:9092',
                             auto_offset_reset='earliest')

    for message in consumer:
        print(message.value)

With this setup I can start up both python programs and get no error for either but there are no messages being received by the consumer.

I tried to debug the situation with Kafkacat. I don't perfectly understand what this means, but maybe it will help interpreting the situation:

root@python-deployment-84d5449cd9-h4ztz:/app# kafkacat -b 10.125.0.7:9092 -L
Metadata for all topics (from broker -1: 10.125.0.7:9092/bootstrap):
 1 brokers:
  broker 1001 at kafka-broker:9092 (controller)
 1 topics:
  topic "messages" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001

root@python-deployment-84d5449cd9-h4ztz:/app# kafkacat -b kafka-broker:9092 -L
%3|1670068311.963|FAIL|rdkafka#producer-1| [thrd:kafka-broker:9092/bootstrap]: kafka-broker:9092/bootstrap: Failed to resolve 'kafka-broker:9092': Name or service not known (after 61ms in state CONNECT)

I have tried to replace the ip 10.125.0.7:9092 in the python scripts with "kafka-broker:9092" but then I get an error

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Also, this is what happens when I try to produce with kafka directly:

kafka-console-producer.sh --topic messages --bootstrap-server 10.125.0.7:9092
>Hello
[2022-12-03 11:46:24,991] WARN [Producer clientId=console-producer] Error connecting to node kafka-broker:9092 (id: 1001 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka-broker: Name or service not known

I was able to get this working in docker-compose with a very similar setup where I just would have the broker container name in the place of the ip but this clearly behaves a bit differently.

Any ideas how to get it to work?

Kalleni
  • 39
  • 1
  • 5
  • First, never use IP addresses in Kubernetes. Always use FQDN service names. For example `zookeeper-service.kafka.cluster.local`... Only services have resolvable hostnames, not Deployments. Secondly, make sure you understand what `advertised.listeners` actually does... Then, try using Strimzi Kafka Operator rather than run your own (misconfigured) Deployment files – OneCricketeer Dec 03 '22 at 17:40
  • Ok, thanks for the tips. I'll look into these. – Kalleni Dec 03 '22 at 19:18

0 Answers0