python-3.xmqttpahopyinotify

Interrupt paho mqtt client to reload subscriptions


I have an mqtt client app that subscribes to topics based on a configuration file. Something like:

def connectMQTT():
    global Connection
    Connection = Client()
    Connection.on_message = handleQuery
    for clientid in clientids.allIDs(): # clientids.allIDs() reads files to get this
        topic = '{}/{}/Q/+'.format(Basename, clientid)
        print('subscription:', topic)
        Connection.subscribe(topic)

I have been using it with a simple invocation like:

def main():
    connectMQTT()
    Connection.loop_forever()

The loop_forever will block forever. But I'd like to notice when the information read by clientids.allIDs() is out of date and I should reconnect forcing it to subscribe afresh.

I can detect a change in the files with pyinotify:

def filesChanged():
    # NOT SURE WHAT TO DO HERE

def watchForChanges():
    watchManager = pyinotify.WatchManager()
    notifier = pyinotify.ThreadedNotifier(watchManager, FileEventHandler(eventCallback))
    notifier.start()
    watchManager.add_watch('/etc/my/config/dir', pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE)

Basically, I need loop_forever (or some other paho mqtt mechanism) to run until some signal comes from the pyinotify machinery. I'm not sure how to weld those two together though. In pseudo code, I thing I want something like

def main():
    signal = setup_directory_change_signal()
    while True:
        connectMQTT()
        Connection.loop(until=signal)
        Connection.disconnect()

I'm not sure how to effect that though.


Solution

  • I finally circled around to the following solution which seems to work. Whereas I was trying to run the notifier in another thread and the mqtt loop in the main thread, the trick seemed to be invert that setup:

    def restartMQTT():
        if Connection:
            Connection.loop_stop()
        connectMQTT()
        Connection.loop_start()
    
    class FileEventHandler(pyinotify.ProcessEvent):
        def process_IN_CREATE(self, fileEvent):
            restartMQTT()
    
        def process_IN_DELETE(self, fileEvent):
            restartMQTT()
    
    
    def main():
        restartMQTT()
        watchManager = pyinotify.WatchManager()
        notifier = pyinotify.Notifier(watchManager, FileEventHandler())
        watchManager.add_watch('/etc/my/config_directory', pyinotify.IN_CREATE | pyinotify.IN_DELETE)
        notifier.loop()
    

    Where connectMQTT stores a newly connected and configured MQTT client in the Connection global.