pythonmqttmosquittolibmosquitto

How to accumulate messages as mqtt client for 1 second, then save it to a file


my problem is as follows: I wrote a program that subscribes to a topic, where 2 dictionaries with one key respectively arrive more times a second. On every message they change their value. I save those dictionaries in a big buffer-dictionary called "Status". What I need is to save a "snapshot" of Status every second into a file.

I tried time.sleep(1) but it drifts. And I don't know how to handle the problem with a schedule due to the already existing client-loop...

I'm pretty new to python and mqtt and would appreciate your help

My code:

import paho.mqtt.client as mqtt
import time
import json

Status = {}

#create client instance
client = mqtt.Client(client_id=None, clean_session=True, transport="tcp")

#connect to broker
client.connect("my_broker", 1883)

#use subscribe() to subscribe to a topic and receive messages
client.subscribe("topic/#", qos=0)

def test1_callback(client, userdata, msg):
    msg_dict = json.loads((msg.payload))
    Status.update(msg_dict)

client.message_callback_add("topic/test1", test1_callback)

while True:
    client.loop_start()
    time.sleep(1)
    client.loop_stop()

    with open('Data.txt', 'a+') as file:
        t = time.localtime()
        Status["time"]= time.strftime("%H:%M:%S", t)
            file.write(str(Status["time"]) + " ")
            file.write(str(Status["key1"]) + " ")
            file.write(str(Status["key2"]) + " ")

    client.loop_start()

Solution

  • Instead of manually stopping the networking thread I would prefer using a timer which fires every second. In addition it might be a good idea to lock the data when storing it to a file - otherwise there might occur an update in between:

    # ...
    import threading
    
    def test1_callback(client, userdata, msg):
       msg_dict = json.loads((msg.payload))
       lock.acquire()
       Status.update(msg_dict)
       lock.release()
    
    def timer_event():
       lock.acquire()
       # save to file here
       lock.release()
       # restart timer
       threading.Timer(1, timer_event).start()
    
    Status = {}
    lock = threading.Lock()
    
    # client initialization
    # ...
    
    client.loop_start()
    threading.Timer(1, timer_event).start()
    
    while True:
       pass
    

    But this won't prevent your stored value to drift away because the topic is apparently published too frequently so your subscriber (or even the broker) is not able to handle a message fast enough.

    So you might want to reduce the interval in which this topic is published. Also notice that you subscribed to a multi-level topic - even if the topics besides "topic/test1" are not handled in your code they still cause load for the broker and the subscribing client