I have a task function like this:
def task (s) :
# doing some thing
return res
The original program is:
res = []
for i in data :
res.append(task(i))
# using pickle to save res every 30s
I need to process a lot of data and I don't care the output order of the results. Due to the long running time, I need to save the current progress regularly. Now I'll change it to multiprocessing
pool = Pool(4)
status = []
res = []
for i in data :
status.append(pool.apply_async(task, (i,))
for i in status :
res.append(i.get())
# using pickle to save res every 30s
Supposed I have processes p0,p1,p2,p3 in Pool and 10 task, (task(0) .... task(9)). If p0 takes a very long time to finish the task(0).
I update my code but the main process was blocked somewhere while other process were still dealing tasks. What's wrong ? Here is the core of code
with concurrent.futures.ProcessPoolExecutor(4) as ex :
for i in self.inBuffer :
futuresList.append(ex.submit(warpper, i))
for i in concurrent.futures.as_completed(futuresList) :
(word, r) = i.result()
self.resDict[word] = r
self.logger.info("{} --> {}".format(word, r))
cur = datetime.now()
if (cur - self.timeStmp).total_seconds() > 30 :
self.outputPickle()
self.timeStmp = datetime.now()
The length of self.inBuffer
is about 100000. self.logger.info
will write the info to a log file. For some special input i, the wrapper
function will print auxiliary information with print. self.resDict
is a dict to store result. self.outputPickle()
will write a .pkl file using pickle.dump
At first, the code run normally, both the update of log file and print
by warpper
. But at a moment, I found that the log file has not been updated for a long time (several hours, the time to complete a warper shall not exceed 120s), but the warpper
is still printing information(Until I kill the process it print about 100 messages without any updates of log file). Also, the time stamp of the output .pkl file doesn't change. Here is the implementation of outputPickle()
def outputPickle (self) :
if os.path.exists(os.path.join(self.wordDir, self.outFile)) :
if os.path.exists(os.path.join(self.wordDir, "{}_backup".format(self.outFile))):
os.remove(os.path.join(self.wordDir, "{}_backup".format(self.outFile)))
shutil.copy(os.path.join(self.wordDir, self.outFile), os.path.join(self.wordDir, "{}_backup".format(self.outFile)))
with open(os.path.join(self.wordDir, self.outFile), 'wb') as f:
pickle.dump(self.resDict, f)
Then I add three print
function :
print("getting res of something")
(word, r) = i.result()
print("finishing i.result")
self.resDict[word] = r
print("finished getting res of {}".format(word))
Here is the log:
getting res of something
finishing i.result
finished getting res of CNICnanotubesmolten
getting res of something
finishing i.result
finished getting res of CNN0
getting res of something
message by warpper
message by warpper
message by warpper
message by warpper
message by warpper
The log "message by warpper" can be printed at most once every time the warpper is called
Yes
Yes, as processes are submitted asynchronously. Also p1
(or other) will take another chunk of data if the size of the input iterable is larger than the max number of processes/workers
"... how to get other results in advance"
One of the convenient options is to rely on concurrent.futures.as_completed
which will return the results as they are completed:
import time
import concurrent.futures
def func(x):
time.sleep(3)
return x ** 2
if __name__ == '__main__':
data = range(1, 5)
results = []
with concurrent.futures.ProcessPoolExecutor(4) as ex:
futures = [ex.submit(func, i) for i in data]
# processing the earlier results: as they are completed
for fut in concurrent.futures.as_completed(futures):
res = fut.result()
results.append(res)
print(res)
Sample output:
4
1
9
16
Another option is to use callback
on apply_async(func[, args[, kwds[, callback[, error_callback]]]])
call; the callback accepts only single argument as the returned result of the function. In that callback you can process the result in minimal way (considering that it's tied to only a single argument/result from a concrete function). The general scheme looks as follows:
def res_callback(v):
# ... processing result
with open('test.txt', 'a') as f: # just an example
f.write(str(v))
print(v, flush=True)
if __name__ == '__main__':
data = range(1, 5)
results = []
with Pool(4) as pool:
tasks = [pool.apply_async(func, (i,), callback=res_callback) for i in data]
# await for tasks finished
But that schema would still require to somehow await (get()
results) for submitted tasks.