I'm trying to set up Kafka in Kubernetes (GKE) with pods for zookeeper, 1 broker, and 2 python apps: producer and consumer. However, I'm not able read and write to the topic correctly.
I have set up zookeeper like this:
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper-service
name: zookeeper-service
namespace: kafka
spec:
type: NodePort
ports:
- name: zookeeper-port
port: 2181
nodePort: 30181
targetPort: 2181
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: zookeeper
name: zookeeper
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: confluentinc/cp-zookeeper:7.3.0
imagePullPolicy: IfNotPresent
env:
- name: ZOOKEEPER_CLIENT_PORT
value: "2181"
- name: ZOOKEEPER_TICK_TIME
value: "2181"
ports:
- containerPort: 2181
broker setup looks like this:
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-broker
name: kafka-service
namespace: kafka
spec:
ports:
- port: 9092
selector:
app: kafka-broker
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka-broker
template:
metadata:
labels:
app: kafka-broker
spec:
hostname: kafka-broker
containers:
- name: kafka
image: confluentinc/cp-kafka:7.3.0
imagePullPolicy: IfNotPresent
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: 10.125.1.238:2181
#value: zookeeper:2181
- name: KAFKA_LISTENERS
value: INSIDE://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: INSIDE://:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INSIDE:PLAINTEXT
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INSIDE
ports:
- containerPort: 9092
The IP Address for KAFKA_ZOOKEEPER_CONNECT comes from the ip zookeeper ClusterIP. I'm not sure if this is a good or long term solution but at this point I would be happy to be able to communicate internally between the pods and using this method is the closest I have gotten.
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-service ClusterIP 10.125.0.7 <none> 9092/TCP 72m
zookeeper-service NodePort 10.125.1.238 <none> 2181:30181/TCP 75m
Inside the broker I have setup a topic like this:
kubectl exec --stdin --tty kafka-broker-54bdd9bdcd-xjz47 -- /bin/bash
kafka-topics --create --bootstrap-server 10.125.0.7:9092 --topic messages
Additionally I have a deployment of two Ubuntu (22.04) based containers with Python (3.9), Kafka (2.13-3.3.1) and Kafkacat installed for debugging and development:
apiVersion: apps/v1
kind: Deployment
metadata:
name: python-deployment
labels:
app: python-agent
namespace: kafka
spec:
replicas: 2
template:
metadata:
name: python-agent
labels:
app: python-agent
spec:
containers:
- name: python-container
image: gcr.io/xx-xx-xx/ubuntu_kafka_python:v1
imagePullPolicy: Always
selector:
matchLabels:
app: python-agent
On another of the python pods I'm trying to run a producer: (I'm using interactive shells inside each pod to manually run these)
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='10.125.0.7:9092')
if __name__ == '__main__':
while True:
producer.send(
topic="messages",
key=b"test",
value=b"Hello"
)
time.sleep(1)
And another of the python pods has a consumer script:
from kafka import KafkaConsumer
if __name__ == '__main__':
consumer = KafkaConsumer('messages', bootstrap_servers='10.125.0.7:9092',
auto_offset_reset='earliest')
for message in consumer:
print(message.value)
With this setup I can start up both python programs and get no error for either but there are no messages being received by the consumer.
I tried to debug the situation with Kafkacat. I don't perfectly understand what this means, but maybe it will help interpreting the situation:
root@python-deployment-84d5449cd9-h4ztz:/app# kafkacat -b 10.125.0.7:9092 -L
Metadata for all topics (from broker -1: 10.125.0.7:9092/bootstrap):
1 brokers:
broker 1001 at kafka-broker:9092 (controller)
1 topics:
topic "messages" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
root@python-deployment-84d5449cd9-h4ztz:/app# kafkacat -b kafka-broker:9092 -L
%3|1670068311.963|FAIL|rdkafka#producer-1| [thrd:kafka-broker:9092/bootstrap]: kafka-broker:9092/bootstrap: Failed to resolve 'kafka-broker:9092': Name or service not known (after 61ms in state CONNECT)
I have tried to replace the ip 10.125.0.7:9092 in the python scripts with "kafka-broker:9092" but then I get an error
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Also, this is what happens when I try to produce with kafka directly:
kafka-console-producer.sh --topic messages --bootstrap-server 10.125.0.7:9092
>Hello
[2022-12-03 11:46:24,991] WARN [Producer clientId=console-producer] Error connecting to node kafka-broker:9092 (id: 1001 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka-broker: Name or service not known
I was able to get this working in docker-compose with a very similar setup where I just would have the broker container name in the place of the ip but this clearly behaves a bit differently.
Any ideas how to get it to work?