When I use AllocateParallel
to control the maximum number of executions, no messages are generated for Task
's start_output
and output
. However, when I run only with Task
, both start_output
and output
function normally.
from PyQt5.QtCore import *
from PyQt5.QtWidgets import QApplication
from collections import deque
import threading
import time
import sys
class AllocateParalle(QThread):
def __init__(self, queue_test):
super().__init__()
self.queue_test = queue_test
self.activate = 0
self.max_activate = 1
self.time = 0
self.lock = threading.Lock()
self.list_thread = []
def run(self):
while len(self.queue_test)!=0:
if self.activate < self.max_activate:
with self.lock:
self.activate += 1
self.time += 1
cmd = self.queue_test.popleft()
thread = Task(cmd, self.time)
thread.finished.connect(self.finished_process)
self.list_thread.append(thread)
thread.start()
else:
time.sleep(3)
def finished_process(self):
with self.lock:
self.activate-=1
class Task(QThread):
def __init__(self, cmd, idx):
super().__init__()
self.lsf_task_process = None
self.case_cmd = cmd
self.idx = idx
self.lock = threading.Lock()
def run(self):
self.lsf_task_process = QProcess()
self.lsf_task_process.setProcessChannelMode(QProcess.MergedChannels)
env = QProcessEnvironment.systemEnvironment()
env.insert('PATH', '/usr/bin:'+env.value('PATH'))
self.lsf_task_process.setProcessEnvironment(env)
self.lsf_task_process.readyReadStandardOutput.connect(self.output)
self.lsf_task_process.started.connect(self.start_output)
self.lsf_task_process.finished.connect(lambda exitcode: self.process_finished(exitcode))
self.lsf_task_process.start('bash', ['-c', self.case_cmd])
self.lsf_task_process.waitForFinished(-1)
def start_output(self):
print('start remote')
def output(self):
while self.lsf_task_process.canReadLine():
line_data = self.lsf_task_process.readLine().data().decode('utf-8', errors = 'replace').strip()
print(f'[{self.idx}] {line_data}')
def process_finished(self, exitcode):
if exitcode!=0:
print(f'[{self.idx}] error')
else:
print(f'[{self.idx}]success')
if __name__ =="__main__":
app = QApplication(sys.argv)
queue = deque()
cmd = "for i in {1..5}; do echo 'Hello, world!'; sleep 1; done"
for i in range(3):
queue.appendleft(cmd)
thread = AllocateParalle(queue)
# thread = Task(cmd, 0)
thread.start()
sys.exit(app.exec())
I tried to put the output
into process_finished
, but this results in all messages being output at once, without real-time updates, which is not what I intended.
def process_finished(self, exitcode):
self.output()
if exitcode!=0:
print(f'[{self.idx}] error')
else:
print(f'[{self.idx}]success')
The problem is caused by the hierarchy of QThreads and how their signals are eventually processed based on their thread affinity:
When a QObject receives a queued signal or a posted event, the slot or event handler will run in the thread that the object lives in.
When you just create and start the Task
thread, its affinity is the same object, meaning that signals created in that thread will call slots in that thread, which is like using connect(<signal>, Qt.DirectConnection)
.
When the Task
objects are created within AllocateParalle
, their affinity is based on that object, even though the connection is created within the thread in which their run
is executed. This seems a bit counter intuitive, but it's probably caused by the way PyQt interacts with thread affinity detection and object scope, considering that you're creating "nested" threads.
Note that there is no such thing as "nesting" in multi-threading, but Qt uses the QObject parent/child tree to verify thread affinity, therefore it also follows a similar pattern to verify in which thread a slot actually exists.
When using the "one Task object" approach, only two threads exist, and the slots are actually executed in the main thread: that thread has its own event loop and is then able to process those signals.
When using the "sub-Task objects" approach, the slots are executed in the AllocateParalle
thread, which does not have an event loop (because its run
has been overridden), and then the signals connected to its slots are queued and never processed.
Here are some possible ways to work around that.
This can be achieved by forcing the Qt.DirectConnection
:
self.lsf_task_process.readyReadStandardOutput.connect(
self.output, Qt.DirectConnection)
self.lsf_task_process.started.connect(
self.start_output, Qt.DirectConnection)
Alternatively, using a lambda, which ensures that the function is always executed within the same thread in which it was created, circumventing thread affinity):
self.lsf_task_process.readyReadStandardOutput.connect(
lambda: self.output())
self.lsf_task_process.started.connect(
lambda: self.start_output())
Interestingly enough, it's the reason for which the finished
connection in your code works.
You can force the thread to be "moved to itself", which will ensure that the connected slots created within the thread are executed in the same thread:
class Task(QThread):
def __init__(self, cmd, idx):
...
self.moveToThread(self)
Note that this practice is normally discouraged (at least in the C++ world), but may be acceptable in Python under certain circumstances. You must be aware of its possible consequences, but as long as QThread is subclassed by overriding run()
in simple ways, there should be no harm in doing this.
This is a slightly more advanced concept, and probably not always desirable, but it follows the QThread concept and is, in my opinion, a bit more elegant.
The concept is relatively simple: create a QEventLoop within run()
and process its events every time it's required. This is fundamentally what QThread does on its own within an unimplemented run()
, which is to call exec()
, create an event loop and starting it.
The difference is that we have to manually call processEvents()
at regular "intervals", because that's the requirement of the implementation: first, do it every time the original while
loop iterates; then do it until every thread is finished.
class AllocateParalle(QThread):
...
def run(self):
loop = QEventLoop()
while len(self.queue_test)!=0:
if self.activate < self.max_activate:
with self.lock:
self.activate += 1
self.time += 1
cmd = self.queue_test.popleft()
thread = Task(cmd, self.time)
thread.finished.connect(self.finished_process)
self.list_thread.append(thread)
thread.start()
else:
time.sleep(1)
loop.processEvents()
while any(thread.isRunning() for thread in self.list_thread):
loop.processEvents()
Note that I reduced the time.sleep()
to 1 for consistency with the bash script and reverting it back to 3 will result in inconsistent print output (the process_finished
slot will be called before processing all signals). Based on your current script, that smaller value is quite recommended, as it's better to check for validity multiple times at smaller intervals, than doing it at arbitrary delays.
Yet, there are some complex situations for which a higher interval may be required, then you need to slightly modify the above code in order to consume the interval while still checking for events.
class AllocateParalle(QThread):
...
def run(self):
et = QElapsedTimer()
waitDelay = 3000 # three seconds
loop = QEventLoop()
while len(self.queue_test)!=0:
if self.activate < self.max_activate:
with self.lock:
self.activate += 1
self.time += 1
cmd = self.queue_test.popleft()
thread = Task(cmd, self.time)
thread.finished.connect(self.finished_process)
self.list_thread.append(thread)
thread.start()
else:
et.start()
while et.elapsed() < waitDelay:
loop.processEvents()
# arbitrary value, it could be 1 in this case
time.sleep(.1)
loop.processEvents()
while any(thread.isRunning() for thread in self.list_thread):
loop.processEvents()
Finally, be aware that you may also use the static QThread.sleep()
, QThread.msleep()
or QThread.usleep()
functions for seconds, milliseconds and microseconds respectively, so that you can avoid the time
imports. Note, though, that those functions only accept integers: QThread.sleep(0.5)
may be interpreted as QThread.sleep(0)
, or even raise an exception in more recent PyQt/PySide versions.
In any case, the above functions are fundamentally identical in OS behavior as the Python time.sleep()
counterpart: for instance, if you only need to wait one second, you can just do self.sleep(1)
(which is the same as QThread.sleep(1)
, since those are static functions).