pythonqmlpyqt5pyudev

Notify QML for 'usb device inserted' events using PyQt5 and pyudev


I have a GUI application (made with PyQt5 and QML) and would like to get notify when a usb device is plugged or unplugged from the computer. After some investigation, I have found that pyudev could be the library to use. But I have trouble using it with PyQt5 and QML. I have succeed to use the pyudev example for MonitorObservor, and there are other example provided in the documentation, here with PySide and here with Glib. I have also found an example using PyQt5 and widgets application here. But I have trouble implementing this on my PyQt5 QML application. I am sure it is very easy, so I think I'm just missing something but I can't found out what...

Here is what I have so far:

import sys
from PyQt5.QtWidgets import QApplication
from PyQt5.QtQml import QQmlApplicationEngine
from PyQt5.QtCore import QUrl
from pyudev import Context, Monitor, Device
from pyudev.pyqt5 import MonitorObserver
from Passerelle import *

# def device_connected(self, device):
def device_connected(self):
    print("Test")
    print("device action: ", device.action, ", path: ", device.device_path)
if __name__ == "__main__":
    app = QApplication(sys.argv)
    engine = QQmlApplicationEngine()
    p = Passerelle()
    engine.rootContext().setContextProperty("passerelle", p)
    engine.load(QUrl("main.qml"))
    if not engine.rootObjects:
        sys.exit(-1)

    context = Context()
    monitor = Monitor.from_netlink(context)
    # monitor.filter_by(subsystem='tty')
    observer = MonitorObserver(monitor)
    observer.deviceEvent.connect(device_connected)
    monitor.start()
    ret = app.exec_()
    sys.exit(ret)

I have succeeded to print "Test" on the console when unplugging or plugging back a device but can't seem to print the device information (TypeError: device_connected() missing 1 required positional argument: 'device' when I uncomment def device_connected(self, device):).

Here the first step would be to be able to print the device information on the console, then to find a way to notify the GUI and finally to notify the GUI only if the device plugged or unplugged has a specified VID/PID.

Edit: I have found a way to identify the device through VID PID using vid = device.get('ID_VENDOR_ID') and pid = device.get('ID_MODEL_ID')

For the 2nd step, I had thought of using the Passerelle class as the QML backend:

from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal#, pyqtProperty, QUrl
from pyudev import Context, Monitor
from pyudev.pyqt5 import MonitorObserver

def device_event(observer, device):
    print ("event ", device.action, " on device ", device)
class Passerelle(QObject):
    sendDeviceEvent = pyqtSignal(int)
    def __init__(self, parent=None):
        print("Passerelle constructor called")
        QObject.__init__(self, parent)
        print("end Passerelle constructor")
    @pyqtSlot()
    def setObserverForDeviceEvents(self):
        print("setObserverForDeviceEvents called")
        context = Context()
        monitor = Monitor.from_netlink(context)
        monitor.filter_by(subsystem='usb')
        observer = MonitorObserver(monitor)
        observer.deviceEvent.connect(self.device_connected)
        monitor.start()
        print("end setObserverForDeviceEvents")
    def device_connected(self, device):
        print("Test")
        print("device action: ", device.action, ", path: ", device.device_path)

But I am not sure it is a good idea as I read in that post that the monitor needed to be started before entering qt's main loop. Which I understand as : the monitor should be started in main.py before calling app.exec_()...

Thank you in advance for your help !


Solution

  • The best thing to do is to modify the GUI in QML, for which the Monitor and Device object must be accessible from QML. Only QObjects receive notifications so I will create 2 classes that wrap with a light layer to both classes using q-properties and slots.

    pyqtudev.py

    from PyQt5 import QtCore
    from pyudev import Context, Monitor, Device
    from pyudev.pyqt5 import MonitorObserver
    
    class QtUdevDevice(QtCore.QObject):
        def __init__(self, parent=None):
            super(QtUdevDevice, self).__init__(parent)
            self.m_dev = None
    
        def initialize(self, dev):
            self.m_dev = dev
    
        @QtCore.pyqtSlot(result=bool)
        def isValid(self):
            return self.m_dev is not None
    
        @QtCore.pyqtProperty(str, constant=True)
        def devType(self):
            if not self.isValid():
                return ""
            if self.m_dev.device_type is None:
                return ""
            return self.m_dev.device_type
    
        @QtCore.pyqtProperty(str, constant=True)
        def subsystem(self):
            if not self.isValid():
                return ""
            return self.m_dev.subsystem
    
        @QtCore.pyqtProperty(str, constant=True)
        def name(self):
            if not self.isValid():
                return ""
            return self.m_dev.sys_name
    
        @QtCore.pyqtProperty(str, constant=True)
        def driver(self):
            if not self.isValid():
                return ""
            if self.m_dev.driver is None:
                return ""
            return self.m_dev.driver
    
        @QtCore.pyqtProperty(str, constant=True)
        def deviceNode(self):
            if not self.isValid():
                return ""
            if self.m_dev.device_node is None:
                return ""
            return self.m_dev.device_node
    
        @QtCore.pyqtProperty(list, constant=True)
        def alternateDeviceSymlinks(self):
            return list(self.m_dev.device_links)
    
        @QtCore.pyqtProperty(str, constant=True)
        def sysfsPath(self):
            if not self.isValid():
                return ""
            return self.m_dev.sys_path
    
        @QtCore.pyqtProperty(int, constant=True)
        def sysfsNumber(self):
            if not self.isValid():
                return -1
            if self.m_dev.sys_number is None:
                return -1
            return int(self.m_dev.sys_number)
    
        @QtCore.pyqtSlot(str, result=str)
        def property(self, name):
            if not self.isValid():
                return ""
            v = self.m_dev.properties.get(name)
            return v if v is not None else ""
    
        @QtCore.pyqtSlot(str, result=bool)
        def hasProperty(self, name):
            if not self.isValid():
                return False
            return self.m_dev.properties.get(name) is not None
    
        @QtCore.pyqtProperty(list, constant=True)
        def deviceProperties(self):
            if not self.isValid():
                return []
            return list(self.m_dev.properties)
    
        @QtCore.pyqtProperty(list, constant=True)
        def sysfsProperties(self):
            if not self.isValid():
                return []
            return list(self.m_dev.attributes.available_attributes)
    
        @QtCore.pyqtProperty(QtCore.QObject, constant=True)
        def parentDevice(self):
            if not self.isValid:
                return 
            if self.m_dev.parent:
                parent_device = QtUdevDevice()
                parent_device.initialize(self.m_dev.parent)
                return parent_device
    
        @QtCore.pyqtProperty(str, constant=True)
        def action(self):
            if not self.isValid():
                return ""
            if self.m_dev.action is None:
                return ""
            return self.m_dev.action
    
        def __repr__(self):
            if self.isValid():
                return "UdevDevice({})".format(self.sysfsPath())
            return "Invalid UdevDevice"
    
    class QtMonitorObserver(QtCore.QObject):
        deviceEvent = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
        deviceAdded = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
        deviceRemoved = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
        deviceChanged = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
        deviceOnlined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
        deviceOfflined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    
        def __init__(self, parent=None):
            super(QtMonitorObserver, self).__init__(parent)
            context = Context()
            self._monitor = Monitor.from_netlink(context)
            self._observer = MonitorObserver(self._monitor, self)
            self._observer.deviceEvent.connect(self.setup_new_signals)
    
        @QtCore.pyqtSlot()
        def start(self):
            self._monitor.start()
    
        @QtCore.pyqtSlot(str)
        def filter_by(self, filter):
            self._monitor.filter_by(subsystem=filter)
    
        @QtCore.pyqtSlot(str)
        def filter_by_tag(self, tag):
            self._monitor.filter_by_tag(tag)
    
        @QtCore.pyqtSlot()
        def remove_filter(self):
            self._monitor.remove_filter()
    
        @QtCore.pyqtSlot(Device)
        def setup_new_signals(self, device):
            new_signals_map = {
                'add': self.deviceAdded,
                'remove': self.deviceRemoved,
                'change': self.deviceChanged,
                'online': self.deviceOnlined,
                'offline': self.deviceOfflined,
            }
            signal = new_signals_map.get(device.action)
            qtdevice = QtUdevDevice()
            qtdevice.initialize(device)
            if signal:
                signal.emit(qtdevice)
            self.deviceEvent.emit(qtdevice)
    

    main.py

    import os
    import sys
    from PyQt5 import QtCore, QtGui, QtQml
    
    from pyqtudev import QtMonitorObserver
    
    def run():
        app = QtGui.QGuiApplication(sys.argv)
        engine = QtQml.QQmlApplicationEngine()
        observer = QtMonitorObserver()
        engine.rootContext().setContextProperty("observer", observer)
        directory = os.path.dirname(os.path.abspath(__file__))
        engine.load(QtCore.QUrl.fromLocalFile(os.path.join(directory, 'main.qml')))
        if not engine.rootObjects():
            return -1
        return app.exec_()
    
    if __name__ == "__main__":
        sys.exit(run())
    

    main.qml

    import QtQuick 2.11
    import QtQuick.Window 2.2
    import QtQuick.Controls 2.2
    
    ApplicationWindow {    
        visible: true
        width: Screen.width/2
        height: Screen.height/2
        Connections {
            target: observer
            onDeviceEvent: {
                console.log(device, device.name, device.action, device.parentDevice)
                if(device.hasProperty("ID_VENDOR_ID")){
                    console.log(device.property("ID_VENDOR_ID"))
                }
            }
        }
        Component.onCompleted: {
            observer.start()
            observer.filter_by("usb")
        } 
    }