I want to run parallel computation on some input data which is loaded from a file. (The file can be really big, so I use a generator for this.)
On a certain number of items, my code runs OK but above this threshold the program hangs (some of the worker processes do not end).
Any suggestions? (I am running this with python2.7, 8 CPUs; 5,000 lines still OK, 7,500 does not work.)
Firstly, you need an input file. Generate it in bash:
for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done
Then, run this:
python2.7 main.py 100 counter.txt > run_log.txt
main.py:
#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp
def eat_queue(job_queue, result_queue):
"""Eats input queue, feeds output queue
"""
proc_name = mp.current_process().name
while True:
try:
job = job_queue.get(block=False)
if job == None:
print(proc_name + " DONE")
return
result_queue.put(execute(job))
except Queue.Empty:
pass
def execute(x):
"""Does the computation on the input data
"""
return x*x
def save_result(result):
"""Saves results in a list
"""
result_list.append(result)
def load(ifilename):
"""Generator reading the input file and
yielding it row by row
"""
ifile = open(ifilename, "r")
for line in ifile:
line = line.strip()
num = int(line)
yield (num)
ifile.close()
print("file closed".upper())
def put_tasks(job_queue, ifilename):
"""Feeds the job queue
"""
for item in load(ifilename):
job_queue.put(item)
for _ in range(get_max_workers()):
job_queue.put(None)
def get_max_workers():
"""Returns optimal number of processes to run
"""
max_workers = mp.cpu_count() - 2
if max_workers < 1:
return 1
return max_workers
def run(workers_num, ifilename):
job_queue = mp.Queue()
result_queue = mp.Queue()
# decide how many processes are to be created
max_workers = get_max_workers()
print "processes available: %d" % max_workers
if workers_num < 1 or workers_num > max_workers:
workers_num = max_workers
workers_list = []
# a process for feeding job queue with the input file
task_gen = mp.Process(target=put_tasks, name="task_gen",
args=(job_queue, ifilename))
workers_list.append(task_gen)
for i in range(workers_num):
tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
args=(job_queue, result_queue))
workers_list.append(tmp)
for worker in workers_list:
worker.start()
for worker in workers_list:
worker.join()
print "worker %s finished!" % worker.name
if __name__ == '__main__':
result_list = []
args = sys.argv
workers_num = int(args[1])
ifilename = args[2]
run(workers_num, ifilename)
This is because nothing in your code takes anything off result_queue
. The behavior then depends on internal queue buffering details: if "not a lot" of data is waiting, everything appears fine, but if "a lot" of data is waiting, everything freezes. Not much more can be said, because it involves layers of internal magic ;-) But the docs do warn about it:
Warning
As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.
One easy way to repair that: First add
result_queue.put(None)
before eat_queue()
returns. Then add:
count = 0
while count < workers_num:
if result_queue.get() is None:
count += 1
before the main program .join()
s the workers. That drains the result queue, and everything shuts down cleanly then.
BTW, this code is pretty bizarre:
while True:
try:
job = job_queue.get(block=False)
if job == None:
print(proc_name + " DONE")
return
result_queue.put(execute(job))
except Queue.Empty:
pass
Why are you doing non-blocking get()
? This turns into a CPU-hog "busy loop" so long as the queue is empty. The primary point of .get()
is to supply an efficient way to wait for work to show up. So:
while True:
job = job_queue.get()
if job is None:
print(proc_name + " DONE")
break
else:
result_queue.put(execute(job))
result_queue.put(None)
does the same thing, but far more efficiently.
Queue size caution
You didn't ask about this, but let's cover it before it bites you ;-) By default, there is no bound on a Queue
's size. If, e.g., you add a billion items to the Queue
, it will demand enough RAM to hold a billion items. So if your producer(s) can generate work items faster than your consumer(s) can process them, memory use can get out of hand quickly.
Fortunately, that's easy to repair: specify a maximum queue size. For example,
job_queue = mp.Queue(maxsize=10*workers_num)
^^^^^^^^^^^^^^^^^^^^^^^
Then job_queue.put(some_work_item)
will block until consumers reduce the size of the queue to less than the maximum. This way you can process enormous problems with a queue that requires trivial RAM.