pythonmqttpaho

python paho crashes nondeterministically


I'm trying to get a basic python paho client to work. Unfortunately paho keeps crashing on different errors instead of handling the mqtt messages.

python client code (mostly taken from the python paho doc):

from paho.mqtt import client as mqtt_client

broker = '127.0.0.1'
port = 1883
topic = "home/sensors/dht22"
client_id = f'python-mqtt-dht22'

PASSWORD = "password"

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id)

    client.username_pw_set("simon", PASSWORD)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        payload = msg.payload.decode()
        print(f"Received `{payload}` from `{msg.topic}` topic")
        handle_msg(payload)

    client.subscribe(topic)
    client.on_message = on_message

def run():
    client = connect_mqtt()
    client.loop_start()
    subscribe(client)
    client.loop_forever()

if __name__ == '__main__':
    run()

Running this together with mosquitto nondeterministically results in either

Exception in thread paho-mqtt-client-python-mqtt-dht22:
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 4523, in _thread_main
    self.loop_forever(retry_first_connection=True)
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 2297, in loop_forever
    rc = self._loop(timeout)
         ^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 1686, in _loop
    rc = self.loop_read()
         ^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 2100, in loop_read
    rc = self._packet_read()
         ^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 3142, in _packet_read
    rc = self._packet_handle()
         ^^^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 3808, in _packet_handle
    return self._handle_publish()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 4099, in _handle_publish
    (slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
struct.error: bad char in struct format

or in

Exception in thread paho-mqtt-client-python-mqtt-dht22:
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
Traceback (most recent call last):
  File "/home/simon/code/mqtt_sink.py", line 76, in <module>
    self.run()
  File "/usr/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 4523, in _thread_main
    run()
    self.loop_forever(retry_first_connection=True)
  File "/home/simon/code/mqtt_sink.py", line 71, in run
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 2297, in loop_forever
    client.loop_forever()
    rc = self._loop(timeout)
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 2297, in loop_forever
         ^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 1686, in _loop
    rc = self._loop(timeout)
         ^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 1686, in _loop
    rc = self.loop_read()
    rc = self.loop_read()
         ^^^^^^^^^^^^^^^^
         ^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 2100, in loop_read
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 2100, in loop_read
    rc = self._packet_read()
    rc = self._packet_read()
         ^^^^^^^^^^^^^^^^^^^
         ^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 3142, in _packet_read
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 3142, in _packet_read
    rc = self._packet_handle()
    rc = self._packet_handle()
         ^^^^^^^^^^^^^^^^^^^^^
         ^^^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 3816, in _packet_handle
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 3816, in _packet_handle
    self._handle_suback()
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 4053, in _handle_suback
    self._handle_suback()
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/client.py", line 4053, in _handle_suback
    reasoncodes = [ReasonCode(SUBACK >> 4, identifier=c) for c in granted_qos]
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/reasoncodes.py", line 129, in __init__
    reasoncodes = [ReasonCode(SUBACK >> 4, identifier=c) for c in granted_qos]
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/reasoncodes.py", line 129, in __init__
    self.getName()  # check it's good
    self.getName()  # check it's good
    ^^^^^^^^^^^^^^
    ^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/reasoncodes.py", line 172, in getName
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/reasoncodes.py", line 172, in getName
    return self.__getName__(self.packetType, self.value)
    return self.__getName__(self.packetType, self.value)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/reasoncodes.py", line 140, in __getName__
  File "/home/simon/.avenv/mqtt/lib/python3.12/site-packages/paho/mqtt/reasoncodes.py", line 140, in __getName__
    raise KeyError(identifier)
    raise KeyError(identifier)
KeyError: 48

Either way it doesn't work. I've searched a bit, but haven't found anything useful on the internet, yet.

The mosquitto server and the message source work, as mosquitto_sub -u simon -P password -t "home/sensors/dht22" -i spy displays the sent messages.

What is wrong with my code, any ideas?

EDIT 2024-11-13: I've managed to get it to work by removing client.loop_start(). So if you have similar problems: use loop_forever().


Solution

  • def run():
        client = connect_mqtt()
        client.loop_start()
        subscribe(client)
        client.loop_forever()
    
    

    You are starting two network loops (one with loop_start() and a second with loop_forever()). This will cause issues like the ones you are seeing because two processes will be attempting to process the data received from the broker (and there will be data races).

    Removing the client.loop_start() should fix this. I would also suggest that you move the subscribe into on_connect (as per the examples).