python-3.xpyqt5python-multithreadingqthreadqprocess

In a multi-level QThread, QProcess readyReadStandardOutput is unable to output normally


When I use AllocateParallel to control the maximum number of executions, no messages are generated for Task's start_output and output. However, when I run only with Task, both start_output and output function normally.

from PyQt5.QtCore import *
from PyQt5.QtWidgets import QApplication
from collections import deque
import threading
import time
import sys

class AllocateParalle(QThread):
    def __init__(self, queue_test):
        super().__init__()
        self.queue_test = queue_test
        self.activate = 0
        self.max_activate = 1
        self.time = 0
        self.lock = threading.Lock()
        self.list_thread = []

    def run(self):
        while len(self.queue_test)!=0:
            if self.activate < self.max_activate:
                with self.lock:
                    self.activate += 1
                    self.time += 1
                    cmd = self.queue_test.popleft()
                thread = Task(cmd, self.time)
                thread.finished.connect(self.finished_process)
                self.list_thread.append(thread)
                thread.start()
            else:
                time.sleep(3)

    def finished_process(self):
        with self.lock:
            self.activate-=1

class Task(QThread):
    def __init__(self, cmd, idx):
        super().__init__()
        self.lsf_task_process = None
        self.case_cmd = cmd
        self.idx = idx
        self.lock = threading.Lock()

    def run(self):
        self.lsf_task_process = QProcess()
        self.lsf_task_process.setProcessChannelMode(QProcess.MergedChannels)

        env = QProcessEnvironment.systemEnvironment()
        env.insert('PATH', '/usr/bin:'+env.value('PATH'))
        self.lsf_task_process.setProcessEnvironment(env)

        self.lsf_task_process.readyReadStandardOutput.connect(self.output)
        self.lsf_task_process.started.connect(self.start_output)
        self.lsf_task_process.finished.connect(lambda exitcode: self.process_finished(exitcode))

        self.lsf_task_process.start('bash', ['-c', self.case_cmd])
        self.lsf_task_process.waitForFinished(-1)

    def start_output(self):
        print('start remote')

    def output(self):
        while self.lsf_task_process.canReadLine():
            line_data = self.lsf_task_process.readLine().data().decode('utf-8', errors = 'replace').strip()
            print(f'[{self.idx}] {line_data}')

    def process_finished(self, exitcode):
        if exitcode!=0:
            print(f'[{self.idx}] error')
        else:
            print(f'[{self.idx}]success')

if __name__ =="__main__":
    app = QApplication(sys.argv)
    queue = deque()
    cmd = "for i in {1..5}; do echo 'Hello, world!'; sleep 1; done"
    for i in range(3):
        queue.appendleft(cmd)
    thread = AllocateParalle(queue)
    # thread = Task(cmd, 0)
    thread.start()
    sys.exit(app.exec())

I tried to put the output into process_finished, but this results in all messages being output at once, without real-time updates, which is not what I intended.

    def process_finished(self, exitcode):
        self.output()
        if exitcode!=0:
            print(f'[{self.idx}] error')
        else:
            print(f'[{self.idx}]success')

Solution

  • The problem is caused by the hierarchy of QThreads and how their signals are eventually processed based on their thread affinity:

    When a QObject receives a queued signal or a posted event, the slot or event handler will run in the thread that the object lives in.

    When you just create and start the Task thread, its affinity is the same object, meaning that signals created in that thread will call slots in that thread, which is like using connect(<signal>, Qt.DirectConnection).

    When the Task objects are created within AllocateParalle, their affinity is based on that object, even though the connection is created within the thread in which their run is executed. This seems a bit counter intuitive, but it's probably caused by the way PyQt interacts with thread affinity detection and object scope, considering that you're creating "nested" threads.

    Note that there is no such thing as "nesting" in multi-threading, but Qt uses the QObject parent/child tree to verify thread affinity, therefore it also follows a similar pattern to verify in which thread a slot actually exists.

    When using the "one Task object" approach, only two threads exist, and the slots are actually executed in the main thread: that thread has its own event loop and is then able to process those signals.

    When using the "sub-Task objects" approach, the slots are executed in the AllocateParalle thread, which does not have an event loop (because its run has been overridden), and then the signals connected to its slots are queued and never processed.

    Here are some possible ways to work around that.

    Ensure that slots are always connected in the same thread

    This can be achieved by forcing the Qt.DirectConnection:

        self.lsf_task_process.readyReadStandardOutput.connect(
            self.output, Qt.DirectConnection)
        self.lsf_task_process.started.connect(
            self.start_output, Qt.DirectConnection)
    

    Alternatively, using a lambda, which ensures that the function is always executed within the same thread in which it was created, circumventing thread affinity):

        self.lsf_task_process.readyReadStandardOutput.connect(
            lambda: self.output())
        self.lsf_task_process.started.connect(
            lambda: self.start_output())
    

    Interestingly enough, it's the reason for which the finished connection in your code works.

    Enforce the thread affinity

    You can force the thread to be "moved to itself", which will ensure that the connected slots created within the thread are executed in the same thread:

    class Task(QThread):
        def __init__(self, cmd, idx):
            ...
            self.moveToThread(self)
    

    Note that this practice is normally discouraged (at least in the C++ world), but may be acceptable in Python under certain circumstances. You must be aware of its possible consequences, but as long as QThread is subclassed by overriding run() in simple ways, there should be no harm in doing this.

    Create an event loop and process its events

    This is a slightly more advanced concept, and probably not always desirable, but it follows the QThread concept and is, in my opinion, a bit more elegant.

    The concept is relatively simple: create a QEventLoop within run() and process its events every time it's required. This is fundamentally what QThread does on its own within an unimplemented run(), which is to call exec(), create an event loop and starting it.

    The difference is that we have to manually call processEvents() at regular "intervals", because that's the requirement of the implementation: first, do it every time the original while loop iterates; then do it until every thread is finished.

    class AllocateParalle(QThread):
        ...
        def run(self):
            loop = QEventLoop()
    
            while len(self.queue_test)!=0:
                if self.activate < self.max_activate:
                    with self.lock:
                        self.activate += 1
                        self.time += 1
                        cmd = self.queue_test.popleft()
                    thread = Task(cmd, self.time)
                    thread.finished.connect(self.finished_process)
                    self.list_thread.append(thread)
                    thread.start()
    
                else:
                    time.sleep(1)
    
                loop.processEvents()
    
            while any(thread.isRunning() for thread in self.list_thread):
                loop.processEvents()
    

    Note that I reduced the time.sleep() to 1 for consistency with the bash script and reverting it back to 3 will result in inconsistent print output (the process_finished slot will be called before processing all signals). Based on your current script, that smaller value is quite recommended, as it's better to check for validity multiple times at smaller intervals, than doing it at arbitrary delays.

    Yet, there are some complex situations for which a higher interval may be required, then you need to slightly modify the above code in order to consume the interval while still checking for events.

    class AllocateParalle(QThread):
        ...
        def run(self):
            et = QElapsedTimer()
            waitDelay = 3000 # three seconds
    
            loop = QEventLoop()
            while len(self.queue_test)!=0:
                if self.activate < self.max_activate:
                    with self.lock:
                        self.activate += 1
                        self.time += 1
                        cmd = self.queue_test.popleft()
                    thread = Task(cmd, self.time)
                    thread.finished.connect(self.finished_process)
                    self.list_thread.append(thread)
                    thread.start()
    
                else:
                    et.start()
                    while et.elapsed() < waitDelay:
                        loop.processEvents()
                        # arbitrary value, it could be 1 in this case
                        time.sleep(.1)
    
                loop.processEvents()
    
            while any(thread.isRunning() for thread in self.list_thread):
                loop.processEvents()
    

    Finally, be aware that you may also use the static QThread.sleep(), QThread.msleep() or QThread.usleep() functions for seconds, milliseconds and microseconds respectively, so that you can avoid the time imports. Note, though, that those functions only accept integers: QThread.sleep(0.5) may be interpreted as QThread.sleep(0), or even raise an exception in more recent PyQt/PySide versions.
    In any case, the above functions are fundamentally identical in OS behavior as the Python time.sleep() counterpart: for instance, if you only need to wait one second, you can just do self.sleep(1) (which is the same as QThread.sleep(1), since those are static functions).