My script reads data from MQTT server and writes it to postgres table.
I'm using loop_forever()
.
The program is supposed to run nonstop.
When the first connection is received everything works fine, but after some time (from minutes to days) on_connect()
is called again. The program works (in the meaning that there is no error in connection) but no meassages are received any more.
In order to debug I tried following:
- induce disconnect by switching off and on the network connection
- shutting off and on the server
- calling
client.disconnect()
To my surprise first and second thing did nothing - there was no logs about new connection and the running program just kept running after the connection revived.
The third attempt was unsuccesfull, I couldn't make it work.
Other remarks:
- I tried using
loop_start()
instead ofloop_forever
but was not succesfull with that at all
So basically the questions are:
- how to counter-act ?
- how to disconnect manually to replicate the problem of calling
on_connect
(and loosing incoming data)
My code:
import json
import sys
from paho.mqtt import client as mqtt_client
import psycopg2
import logging as log
from datetime import datetime
import certifi
from collections import defaultdict
def connect_mqtt(userdict) -> mqtt_client:
def on_connect(client, userdata, flags, rc):
log.info(f"{datetime.now()}: Trying connect")
if rc == 0:
log.info(f"{datetime.now()}: Connection returned result: " + mqtt_client.connack_string(rc))
else:
log.info("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id=conf['client_id'], protocol=mqtt_client.MQTTv31, userdata=userdict)
client.tls_set(certifi.where())
client.tls_insecure_set(True)
client.username_pw_set(conf['username'], conf['password'])
client.on_connect = on_connect
client.connect(conf['broker'], conf['port'])
return client
def on_message(client, userdata, msg):
now_ts_in_s = round(datetime.timestamp(datetime.now()))
now_dt_in_s = datetime.fromtimestamp(now_ts_in_s)
try:
value = float(msg.payload.decode())
data = [now_dt_in_s, value]
insert_to_psql(userdata['conn'], data)
except ValueError:
pass
def insert_to_psql(conn, data):
cursor = conn.cursor()
insert_query = "INSERT INTO data (time, value) VALUES (%s, %s) ON CONFLICT " \
"DO " \
"NOTHING;"
cursor.execute(insert_query, data)
conn.commit()
def run():
psql_conn = "postgres://postgres:blablabla"
conn = psycopg2.connect(psql_conn)
userdict = {'collected_data': defaultdict(list), 'conn': conn, 'first_conn': True}
client = connect_mqtt(userdict)
client.subscribe(conf['topic'])
client.on_message = on_message
try:
client.loop_forever()
finally:
client.disconnect()
conn.close()
if __name__ == '__main__':
with open(sys.argv[1]) as f:
conf = json.load(f)
run()