0

Couple of things to know:

  • Using a raspberry pi 4
  • Running Unbuntu 20.04 image on ri4 (ARM64)
  • I use ZeroTier and SSH to connect remotely to the ri4
  • I was able to run 3 containers: nodered, mosquito, portainer, zookeeper and Kafka.

Here's the docker-compose:

  zookeeper:
    image: ubuntu/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: ubuntu/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS= "mqtt-sensor-1:1:1"
      - KAFKA_DELETE_TOPIC_ENABLE=true
    depends_on:
      - zookeeper
    restart: on-failure

So far, I was able to pub/sub with MQTT without any issues. Here's my publisher code - it's working:

import  paho.mqtt.client as mqtt
from random import randrange, uniform
import time

client= mqtt.Client("GreenForges v1")

if client.connect("localhost",1883,60) !=0:
                print("could not connect to MQTT broker!")
                sys.exit(-1)
while True:
    randNumber = uniform(20.0, 21.0)
    client.publish("mqtt-sensor-1", randNumber)
    print("Just published " + str(randNumber) + " to Topic mqtt-sensor-1")
    time.sleep(10)

I've been trying to code the bridge with python3 with the pykafka library, but I keep getting an exception. Here's the mqtt bridge code to Kafka:

import paho.mqtt.client as mqtt
from pykafka import KafkaClient
import time

mqtt_client = mqtt.Client("BridgeMQTT3Kafka")
mqtt_client.connect("localhost",1883,60)

kafka_client = KafkaClient(hosts='localhost:9092')
kafka_topic = kafka_client.topics['mqtt-sensor-1']
kafka_producer = kafka_topic.get_sync_producer()

def on_message(client, userdata, message):
    msg_payload = str(message.payload)
    print("Received MQTT message: ", msg_payload)
    kafka_producer.produce(str(msg_payload).encode('ascii'))
    print("KAFKA: Just published " + str(msg_payload) + " to topic mqtt-sensor-1")

mqtt_client.loop_start()
mqtt_client.subscribe("mqtt-sensor-1")
mqtt_client.on_message = on_message
time.sleep(300)
mqtt_client.loop_end()

It always fails at the kafka_producer line, and only returns this error:

File "/home/green/.local/lib/python3.10/site-packages/pykafka/broker.py", line 44, in wrapped
    raise SocketDisconnectedError
pykafka.exceptions.SocketDisconnectedError

Any pointers? I had a quick look at the Kafka logs.

Controller.log

[2022-09-09 15:45:16,117] INFO [Controller id=0] New partition creation callback for mqtt-sensor-1-0,mqtt-sensor-1-1 (kafka.controller.KafkaController)
[2022-09-09 15:47:59,406] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2022-09-09 15:47:59,406] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2022-09-09 15:47:59,417] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController)
[2022-09-09 15:47:59,423] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)

server.log

[2022-09-09 15:45:16,001] INFO Creating topic mqtt-sensor-1 with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(0), 1 -> ArrayBuffer(0)) (kafka.zk.AdminZkClient)
[2022-09-09 15:45:16,344] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(mqtt-sensor-1-1, mqtt-sensor-1-0) (kafka.server.ReplicaFetcherManager)
[2022-09-09 15:45:16,596] INFO [LogLoader partition=mqtt-sensor-1-1, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2022-09-09 15:45:16,649] INFO Created log for partition mqtt-sensor-1-1 in /tmp/kafka-logs/mqtt-sensor-1-1 with properties {} (kafka.log.LogManager)
[2022-09-09 15:45:16,658] INFO [Partition mqtt-sensor-1-1 broker=0] No checkpointed highwatermark is found for partition mqtt-sensor-1-1 (kafka.cluster.Partition) 
[2022-09-09 15:45:16,662] INFO [Partition mqtt-sensor-1-1 broker=0] Log loaded for partition mqtt-sensor-1-1 with initial high watermark 0 (kafka.cluster.Partition)
[2022-09-09 15:45:16,722] INFO [LogLoader partition=mqtt-sensor-1-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2022-09-09 15:45:16,731] INFO Created log for partition mqtt-sensor-1-0 in /tmp/kafka-logs/mqtt-sensor-1-0 with properties {} (kafka.log.LogManager)
[2022-09-09 15:45:16,732] INFO [Partition mqtt-sensor-1-0 broker=0] No checkpointed highwatermark is found for partition mqtt-sensor-1-0 (kafka.cluster.Partition) 
[2022-09-09 15:45:16,734] INFO [Partition mqtt-sensor-1-0 broker=0] Log loaded for partition mqtt-sensor-1-0 with initial high watermark 0 (kafka.cluster.Partition)

I digged more and checked if it was actually running.

./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

Connecting to localhost:2181
[2022-09-09 16:18:36,326] WARN Session 0x0 for sever localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
        at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)
KeeperErrorCode = ConnectionLoss for /brokers/ids
[2022-09-09 16:18:36,482] ERROR Exiting JVM with code 1 (org.apache.zookeeper.util.ServiceUtils)

./bin/zookeeper-shell.sh localhost:9092 ls /brokers/ids

Connecting to localhost:9092
[2022-09-09 16:19:44,858] WARN Client session timed out, have not heard from server in 30006ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
[2022-09-09 16:19:44,867] WARN Session 0x0 for sever localhost/127.0.0.1:9092, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
org.apache.zookeeper.ClientCnxn$SessionTimeoutException: Client session timed out, have not heard from server in 30006ms for session id 0x0
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1258)
KeeperErrorCode = ConnectionLoss for /brokers/ids
[2022-09-09 16:19:45,018] ERROR Exiting JVM with code 1 (org.apache.zookeeper.util.ServiceUtils)

After further testing, I'm getting more confused... I was able to test producer/consumer and it works on the cluster - I get the value to the consumer:

bin/kafka-console-consumer.sh --topic mqtt-sensor-1 --from-beginning --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic mqtt-sensor-1 --bootstrap-server localhost:9092
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
fneron
  • 1,057
  • 3
  • 15
  • 39
  • All that error says is that the connection to the kafka instance was closed. What do the logs in the kafka container say? It's hard to say more without looking at what is happening around line 44 in the `pykafka/broker.py` file – hardillb Sep 09 '22 at 07:47
  • Where you want me to look? I checked opt/kafka/logs and there's nothing out of ordinary... Updated with question with logs – fneron Sep 09 '22 at 15:51
  • I was expecting some logging of client connection/disconnection, but appears not. So you'll have to poke round the line the error was thrown from to see if is has any indication as to why/what it was doing at the time. – hardillb Sep 09 '22 at 15:58
  • 1
    Updated - it seems my Kafka is not even running. – fneron Sep 09 '22 at 16:23
  • 1) zookeeper-shell cannot connect to Kafka. Seems your Zookeeper is down as well... 2) Note: pykafka is deprecated / unmaintained. Also, there are projects on the web using **Kafka Connect** for taking MQTT to Kafka – OneCricketeer Sep 10 '22 at 02:00
  • 1
    The Python code running on your host won't work because `KAFKA_ADVERTISED_HOST_NAME=kafka` rather than set to `localhost` – OneCricketeer Sep 12 '22 at 13:59
  • Oh that definitely might be it... I'll test it out and confirm. – fneron Sep 12 '22 at 15:20

0 Answers0