pythonmultithreadingexceptionpaho

Python: propagate exceptions from paho (MQTT) thread to the main thread


I have a python application, which uses threads to receive topics from an MQTT broker, as well as monitors messages on UART. Running on a Raspberry Pi.

The paho MQTT library's interface offers callbacks to handle receiving messages, and a loop_start() that apparently starts a new thread then returns.

My issue is that I want possible exceptions in the reception callback to raise all the way to the "top" of the application, so that I can cleanly terminate the whole application.

As it is now, an exception in the paho thread terminates the thread, but the other threads are kept alive, and the application is running but not functional. This is bad, I count on a systemd service to restart the crashed application, which is why I need it to actually crash.

I have done this for the UART RX thread:

class MyClassError(Exception):
    def __init__(self, message):
        self.message = f"{message}"
        super().__init__(self.message)


class MyClass()::
    # other stuff...
    def rx(self):
        try:
            while True:
                do_your_thing()
        except Exception as e:
            self.exc = MyClassError(f"An unexpected error occurred: {e}")
            return

    def run(self):
        self.rx_thread.start()
        self.rx_thread.join()
        if self.exc is not None:
            raise self.exc

, which works, but has the caveat that it does not return. But I can catch the exception in the caller thread.

There are alternatives to achieve a similar effect, such as try/except/sys.exit(1) in the callback code, or maybe have the main thread monitor the state of the paho thread... but it feels like I should be able to raise exceptions in the sub-thread, and catch them in the parent thread.

Moreover, I would like to catch and raise any exception in any of the MQTT code, not only that specific callback. But that'd be for step 2.

How do I escalate the exception occurring in the paho library (or the callbacks it calls), to the main thread so that it can terminate everything?


Solution

  • My current solution is to catch uncaught exceptions at top level this way:

    
    def handle_exceptions(e):
        try:
            my_object.stop()
        except Exception as e:
            print(e)
        try:
            mqtt_manager.stop()
        except Exception as e:
            print(e)
        print(traceback.print_exception(e.exc_type, e.exc_value, e.exc_traceback))
    
    threading.excepthook = handle_exceptions
    

    The MQTT loop takes several seconds to die after this stop (I'd like to know why? Is the loop only dying at the next event?), but the whole application dies, at last.