0

My script reads data from MQTT server and writes it to postgres table.
I'm using loop_forever(). The program is supposed to run nonstop.

When the first connection is received everything works fine, but after some time (from minutes to days) on_connect() is called again. The program works (in the meaning that there is no error in connection) but no meassages are received any more.

In order to debug I tried following:

  • induce disconnect by switching off and on the network connection
  • shutting off and on the server
  • calling client.disconnect()

To my surprise first and second thing did nothing - there was no logs about new connection and the running program just kept running after the connection revived.
The third attempt was unsuccesfull, I couldn't make it work.

Other remarks:

  • I tried using loop_start() instead of loop_forever but was not succesfull with that at all

So basically the questions are:

  • how to counter-act ?
  • how to disconnect manually to replicate the problem of calling on_connect (and loosing incoming data)

My code:

import json
import sys
from paho.mqtt import client as mqtt_client
import psycopg2
import logging as log
from datetime import datetime
import certifi
from collections import defaultdict


def connect_mqtt(userdict) -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        log.info(f"{datetime.now()}: Trying connect")
        if rc == 0:
            log.info(f"{datetime.now()}: Connection returned result: " + mqtt_client.connack_string(rc))
        else:
            log.info("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id=conf['client_id'], protocol=mqtt_client.MQTTv31, userdata=userdict)
    client.tls_set(certifi.where())
    client.tls_insecure_set(True)

    client.username_pw_set(conf['username'], conf['password'])
    client.on_connect = on_connect
    client.connect(conf['broker'], conf['port'])
    return client


def on_message(client, userdata, msg):
    now_ts_in_s = round(datetime.timestamp(datetime.now()))
    now_dt_in_s = datetime.fromtimestamp(now_ts_in_s)

    try:
        value = float(msg.payload.decode())
        data = [now_dt_in_s,  value]
        insert_to_psql(userdata['conn'], data)
    except ValueError:
        pass


def insert_to_psql(conn, data):
    cursor = conn.cursor()
    insert_query = "INSERT INTO data (time, value) VALUES (%s, %s) ON CONFLICT " \
                   "DO " \
                   "NOTHING;"
    cursor.execute(insert_query, data)
    conn.commit()


def run():
    psql_conn = "postgres://postgres:blablabla"
    conn = psycopg2.connect(psql_conn)

    userdict = {'collected_data': defaultdict(list), 'conn': conn, 'first_conn': True}
    client = connect_mqtt(userdict)
    client.subscribe(conf['topic'])
    client.on_message = on_message
    try:
        client.loop_forever()
    finally:
        client.disconnect()
        conn.close()


if __name__ == '__main__':
    with open(sys.argv[1]) as f:
        conf = json.load(f)
    run()
Daniel R
  • 140
  • 1
  • 10

2 Answers2

1

If connect is called, then disconnect is probably called before that. Could be some temporary network issue. You should configure the corresponding callback.

Note, that because of this, it's most important that you subscribe in the on_connect callback and not one time outside that. When paho disconnects and connects again, it won't resubscribe automatically. That's why subscriptions should be made in the on_connect callback.

To you question how to test this. You can run a local MQTT broker, and just shut it down after your app has connected the first time and then start it again.

Apart from that, messages won't be lost if you configure your broker accordingly. MQTT has various QOS settings for that specific purpose.

If you think your app is the problem, and its not some networking issue, you could gain a somewhat more solid setup by deploying your app multiple times and let the replicas subscribe via shared subscription. https://www.hivemq.com/blog/mqtt5-essentials-part7-shared-subscriptions/

The Fool
  • 16,715
  • 5
  • 52
  • 86
  • " That's why subscriptions should be made in the on_connect callback." Thank I'll try that, seems to be solution. On the second part - I have shut down the server, and then turned it on, the running script did not seem to received new on_connect() – Daniel R Feb 08 '22 at 11:27
  • "Apart from that, messages won't be lost if you configure your broker accordingly. MQTT has various QOS settings for that specific purpose." But that rquires configuring server, right? I don't think I have permission to that (it's not mine), but will check it out – Daniel R Feb 08 '22 at 11:29
  • If you are not in control, you can't do much about it, I guess. If your app isn't connected, it isn't connected. Network issues can't be 100% prevented. It's an involuntary disruption. Potentially you can use a subscription group and deploy your app multiple times, if you think your code is the issue and not something from the outside. – The Fool Feb 08 '22 at 11:31
1

In my case the reason I kept getting "on_connect" events in my client was because I had already connected in a second process by the same client_id, so the Paho MQTT server was alternatively disconnecting the first or second client over and over, every 2 seconds, as they kept connecting with the same client_id.

See Two paho.mqtt clients subscribing to the same client localy

Michael Currie
  • 13,721
  • 9
  • 42
  • 58