Couple of things to know:
- Using a raspberry pi 4
- Running Unbuntu 20.04 image on ri4 (ARM64)
- I use ZeroTier and SSH to connect remotely to the ri4
- I was able to run 3 containers: nodered, mosquito, portainer, zookeeper and Kafka.
Here's the docker-compose:
zookeeper:
image: ubuntu/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: ubuntu/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS= "mqtt-sensor-1:1:1"
- KAFKA_DELETE_TOPIC_ENABLE=true
depends_on:
- zookeeper
restart: on-failure
So far, I was able to pub/sub with MQTT without any issues. Here's my publisher code - it's working:
import paho.mqtt.client as mqtt
from random import randrange, uniform
import time
client= mqtt.Client("GreenForges v1")
if client.connect("localhost",1883,60) !=0:
print("could not connect to MQTT broker!")
sys.exit(-1)
while True:
randNumber = uniform(20.0, 21.0)
client.publish("mqtt-sensor-1", randNumber)
print("Just published " + str(randNumber) + " to Topic mqtt-sensor-1")
time.sleep(10)
I've been trying to code the bridge with python3 with the pykafka library, but I keep getting an exception. Here's the mqtt bridge code to Kafka:
import paho.mqtt.client as mqtt
from pykafka import KafkaClient
import time
mqtt_client = mqtt.Client("BridgeMQTT3Kafka")
mqtt_client.connect("localhost",1883,60)
kafka_client = KafkaClient(hosts='localhost:9092')
kafka_topic = kafka_client.topics['mqtt-sensor-1']
kafka_producer = kafka_topic.get_sync_producer()
def on_message(client, userdata, message):
msg_payload = str(message.payload)
print("Received MQTT message: ", msg_payload)
kafka_producer.produce(str(msg_payload).encode('ascii'))
print("KAFKA: Just published " + str(msg_payload) + " to topic mqtt-sensor-1")
mqtt_client.loop_start()
mqtt_client.subscribe("mqtt-sensor-1")
mqtt_client.on_message = on_message
time.sleep(300)
mqtt_client.loop_end()
It always fails at the kafka_producer line, and only returns this error:
File "/home/green/.local/lib/python3.10/site-packages/pykafka/broker.py", line 44, in wrapped
raise SocketDisconnectedError
pykafka.exceptions.SocketDisconnectedError
Any pointers? I had a quick look at the Kafka logs.
Controller.log
[2022-09-09 15:45:16,117] INFO [Controller id=0] New partition creation callback for mqtt-sensor-1-0,mqtt-sensor-1-1 (kafka.controller.KafkaController)
[2022-09-09 15:47:59,406] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2022-09-09 15:47:59,406] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2022-09-09 15:47:59,417] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController)
[2022-09-09 15:47:59,423] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)
server.log
[2022-09-09 15:45:16,001] INFO Creating topic mqtt-sensor-1 with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(0), 1 -> ArrayBuffer(0)) (kafka.zk.AdminZkClient)
[2022-09-09 15:45:16,344] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(mqtt-sensor-1-1, mqtt-sensor-1-0) (kafka.server.ReplicaFetcherManager)
[2022-09-09 15:45:16,596] INFO [LogLoader partition=mqtt-sensor-1-1, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2022-09-09 15:45:16,649] INFO Created log for partition mqtt-sensor-1-1 in /tmp/kafka-logs/mqtt-sensor-1-1 with properties {} (kafka.log.LogManager)
[2022-09-09 15:45:16,658] INFO [Partition mqtt-sensor-1-1 broker=0] No checkpointed highwatermark is found for partition mqtt-sensor-1-1 (kafka.cluster.Partition)
[2022-09-09 15:45:16,662] INFO [Partition mqtt-sensor-1-1 broker=0] Log loaded for partition mqtt-sensor-1-1 with initial high watermark 0 (kafka.cluster.Partition)
[2022-09-09 15:45:16,722] INFO [LogLoader partition=mqtt-sensor-1-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2022-09-09 15:45:16,731] INFO Created log for partition mqtt-sensor-1-0 in /tmp/kafka-logs/mqtt-sensor-1-0 with properties {} (kafka.log.LogManager)
[2022-09-09 15:45:16,732] INFO [Partition mqtt-sensor-1-0 broker=0] No checkpointed highwatermark is found for partition mqtt-sensor-1-0 (kafka.cluster.Partition)
[2022-09-09 15:45:16,734] INFO [Partition mqtt-sensor-1-0 broker=0] Log loaded for partition mqtt-sensor-1-0 with initial high watermark 0 (kafka.cluster.Partition)
I digged more and checked if it was actually running.
./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
Connecting to localhost:2181
[2022-09-09 16:18:36,326] WARN Session 0x0 for sever localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)
KeeperErrorCode = ConnectionLoss for /brokers/ids
[2022-09-09 16:18:36,482] ERROR Exiting JVM with code 1 (org.apache.zookeeper.util.ServiceUtils)
./bin/zookeeper-shell.sh localhost:9092 ls /brokers/ids
Connecting to localhost:9092
[2022-09-09 16:19:44,858] WARN Client session timed out, have not heard from server in 30006ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
[2022-09-09 16:19:44,867] WARN Session 0x0 for sever localhost/127.0.0.1:9092, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
org.apache.zookeeper.ClientCnxn$SessionTimeoutException: Client session timed out, have not heard from server in 30006ms for session id 0x0
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1258)
KeeperErrorCode = ConnectionLoss for /brokers/ids
[2022-09-09 16:19:45,018] ERROR Exiting JVM with code 1 (org.apache.zookeeper.util.ServiceUtils)
After further testing, I'm getting more confused... I was able to test producer/consumer and it works on the cluster - I get the value to the consumer:
bin/kafka-console-consumer.sh --topic mqtt-sensor-1 --from-beginning --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic mqtt-sensor-1 --bootstrap-server localhost:9092