pythonmultithreadingsignals-slotsqthreadpyqt6

The attempt to terminate a function in a thread using signals fails in PyQt6


I have a time-consuming thread operation, but it cannot emit progress during the processing. So, I use another thread to simulate its progress. When the time-consuming operation is completed, it emits an end signal and simultaneously signals the end of the simulation process. However, the actual function operation in the simulated thread is not controlled. In the example below, self.thread_two_stop_signal.emit(), however, handle_two() is still running.

import sys
import time

from PyQt6.QtCore import QObject, pyqtSignal, QThread
from PyQt6.QtWidgets import QApplication, QMainWindow, QProgressBar, QPushButton


class ThreadOne(QObject):
    done_signal = pyqtSignal()
    finished_signal = pyqtSignal()

    def __init__(self):
        super().__init__()

    def run(self):
        for i in range(100):
            time.sleep(0.01)
        self.done_signal.emit()

    def finished(self):
        self.finished_signal.emit()


class ThreadTwo(QObject):
    finished_signal = pyqtSignal()
    progress_signal = pyqtSignal(int)

    def __init__(self):
        self.if_finished = False
        super().__init__()

    def run(self):
        i = 0
        while True:
            if self.if_finished or i == 99:
                self.progress_signal.emit(i)
                return
            i += 1
            self.progress_signal.emit(i)
            time.sleep(0.1)

    def finished(self):
        self.finished_signal.emit()

    def reset(self):
        self.if_finished = False

    def stop(self):
        print("stop")
        self.if_finished = True


class MainWindow(QMainWindow):

    thread_one_do_signal = pyqtSignal()
    thread_one_finished_signal = pyqtSignal()

    thread_two_do_signal = pyqtSignal()
    thread_two_reset_signal = pyqtSignal()
    thread_two_stop_signal = pyqtSignal()
    thread_two_finished_signal = pyqtSignal()

    def __init__(self):
        super().__init__()

        self.setWindowTitle("My PyQt6 App")

        self.setGeometry(100, 100, 400, 200)

        self.btn = QPushButton("Start", self)
        self.btn.setGeometry(150, 25, 50, 50)

        self.bar = QProgressBar(self)
        self.bar.setGeometry(50, 100, 300, 20)

        self.btn.clicked.connect(self.start)

        self.proxy_thread_one = QThread()
        self.thread_one = ThreadOne()
        self.thread_one.moveToThread(self.proxy_thread_one)

        self.thread_one_do_signal.connect(self.thread_one.run)
        self.thread_one_finished_signal.connect(self.thread_one.finished)

        self.thread_one.done_signal.connect(self.handle_one)
        self.thread_one.finished_signal.connect(self.proxy_thread_one.quit)
        self.thread_one.finished_signal.connect(self.proxy_thread_one.wait)

        self.proxy_thread_two = QThread()
        self.thread_two = ThreadTwo()
        self.thread_two.moveToThread(self.proxy_thread_two)

        self.thread_two_do_signal.connect(self.thread_two.run)
        self.thread_two_stop_signal.connect(self.thread_two.stop)
        self.thread_two_reset_signal.connect(self.thread_two.reset)
        self.thread_two_finished_signal.connect(self.thread_two.finished)

        self.thread_two.progress_signal.connect(self.handle_two)
        self.thread_two.finished_signal.connect(self.proxy_thread_two.quit)
        self.thread_two.finished_signal.connect(self.proxy_thread_two.wait)

    def start(self):
        self.proxy_thread_one.start()
        self.proxy_thread_two.start()

        self.thread_one_do_signal.emit()
        self.thread_two_reset_signal.emit()
        self.thread_two_do_signal.emit()

    def handle_one(self):
        self.thread_two_stop_signal.emit()  
        self.thread_two_finished_signal.emit()
        self.thread_one_finished_signal.emit() 
        self.bar.setValue(100)

    def handle_two(self, value):
        self.bar.setValue(value)


if __name__ == "__main__":
    app = QApplication(sys.argv) 
    main_window = MainWindow()  
    main_window.show() 
    sys.exit(app.exec())

After I disconnect the singal, it started working.

    def handle_one(self):
        self.thread_two.progress_signal.disconnect(self.handle_two)
        self.thread_two_stop_signal.emit()
        self.thread_two_finished_signal.emit()
        self.thread_one_finished_signal.emit()
        self.bar.setValue(100)

I want to understand why my previous approach didn't work and if there is a solution for it.

Additionally, I would like to inquire about my usage of threads in PyQt6. Is my approach correct? Can it exit gracefully? Are there any potential thread safety issues?


Solution

  • The main problem with the example is that it uses blocking loops within each thread, which will prevent immediate processing of thread-local events.

    When a signal is emitted across threads, an event will be posted to the event-loop of the receiving thread. But if a blocking loop is being executed within the receiving worker thread, it will freeze event-processing in exactly the same way as it would within the main GUI thread. So steps must be taken to explictly enforce prcessing of such thread-local events. The simplest way to achieve this is to call QApplication.processEvents(), like so:

    class ThreadTwo(QObject):
        ...
        def run(self):
            i = 0
            while True:
                QApplication.processEvents()
                if self.if_finished or i == 99:
                    self.progress_signal.emit(i)
                    return
                ...
    

    As for the more general question of whether threads are used "correctly" in the example: this is largely a matter of opinion/taste. The problem mentioned above could be completely by-passed by removing most of the custom signals and calling stop() directly instead. Strictly speaking, this would mean that modifying the if_finished attribute was no longer thread-safe - but given that only one thread ever needs to read the value of if_finished, this would make no measurable difference to the reliability of the code. In fact, it could be argued that the resulting simplification would make the code easier to understand and maintain, and thus qualitatively more reliable in that sense. To give some idea of how such code might look, try the re-written example bewlow:

    import sys, random
    
    from PyQt6.QtCore import QObject, pyqtSignal, QThread
    from PyQt6.QtWidgets import (
        QApplication, QMainWindow, QProgressBar, QPushButton,
        QWidget, QHBoxLayout,
        )
    
    
    class WorkerOne(QObject):
        finished = pyqtSignal()
    
        def run(self):
            delay = random.randint(25, 50)
            for i in range(100):
                QThread.msleep(delay)
            self.finished.emit()
    
    
    class WorkerTwo(QObject):
        progress = pyqtSignal(int)
    
        def run(self):
            self._stopped = False
            for i in range(1, 101):
                QThread.msleep(50)
                if not self._stopped:
                    self.progress.emit(i)
                else:
                    self.progress.emit(100)
                    break
    
        def stop(self):
            print('stop')
            self._stopped = True
    
    
    class MainWindow(QMainWindow):
        def __init__(self):
            super().__init__()
            self.setWindowTitle("My PyQt6 App")
            self.setGeometry(600, 200, 400, 50)
            widget = QWidget()
            layout = QHBoxLayout(widget)
            self.btn = QPushButton("Start")
            self.bar = QProgressBar()
            layout.addWidget(self.bar)
            layout.addWidget(self.btn)
            self.setCentralWidget(widget)
            self.btn.clicked.connect(self.start)
    
            self.thread_one = QThread()
            self.worker_one = WorkerOne()
            self.worker_one.moveToThread(self.thread_one)
            self.thread_one.started.connect(self.worker_one.run)
            self.worker_one.finished.connect(self.handle_finished)
    
            self.thread_two = QThread()
            self.worker_two = WorkerTwo()
            self.worker_two.moveToThread(self.thread_two)
            self.thread_two.started.connect(self.worker_two.run)
            self.worker_two.progress.connect(self.bar.setValue)
    
        def start(self):
            if not (self.thread_one.isRunning() or
                    self.thread_two.isRunning()):
                self.thread_one.start()
                self.thread_two.start()
    
        def handle_finished(self):
            self.worker_two.stop()
            self.reset()
    
        def reset(self):
            self.thread_one.quit()
            self.thread_two.quit()
            self.thread_one.wait()
            self.thread_two.wait()
    
        def closeEvent(self, event):
            self.reset()
    
    
    if __name__ == "__main__":
    
        app = QApplication(sys.argv)
        main_window = MainWindow()
        main_window.show()
        sys.exit(app.exec())