pythonmultiprocessingpoolapply-async

how to "poll" python multiprocess pool apply_async


I have a task function like this:

def task (s) :
    # doing some thing
    return res

The original program is:

res = []
for i in data :
    res.append(task(i))
    # using pickle to save res every 30s

I need to process a lot of data and I don't care the output order of the results. Due to the long running time, I need to save the current progress regularly. Now I'll change it to multiprocessing

pool = Pool(4)
status = []
res = []
for i in data :
    status.append(pool.apply_async(task, (i,))

for i in status :
    res.append(i.get())
    # using pickle to save res every 30s

Supposed I have processes p0,p1,p2,p3 in Pool and 10 task, (task(0) .... task(9)). If p0 takes a very long time to finish the task(0).

  1. Does the main process be blocked at the first "res.append(i.get())" ?
  2. If p1 finished task(1) and p0 still deal with task(0), will p1 continue to deal with task(4) or later ?
  3. If the answer to the first question is yes, then how to get other results in advance. Finally, get the result of task (0)

I update my code but the main process was blocked somewhere while other process were still dealing tasks. What's wrong ? Here is the core of code

       with concurrent.futures.ProcessPoolExecutor(4) as ex :
            for i in self.inBuffer :
                futuresList.append(ex.submit(warpper, i))
            
            for i in concurrent.futures.as_completed(futuresList) :
                (word, r) = i.result()
                self.resDict[word] = r
                self.logger.info("{} --> {}".format(word, r))
                cur = datetime.now()
                if (cur - self.timeStmp).total_seconds() > 30 :
                    self.outputPickle()
                    self.timeStmp = datetime.now()

The length of self.inBuffer is about 100000. self.logger.info will write the info to a log file. For some special input i, the wrapper function will print auxiliary information with print. self.resDict is a dict to store result. self.outputPickle() will write a .pkl file using pickle.dump

At first, the code run normally, both the update of log file and print by warpper. But at a moment, I found that the log file has not been updated for a long time (several hours, the time to complete a warper shall not exceed 120s), but the warpper is still printing information(Until I kill the process it print about 100 messages without any updates of log file). Also, the time stamp of the output .pkl file doesn't change. Here is the implementation of outputPickle()

    def outputPickle (self) :
        if os.path.exists(os.path.join(self.wordDir, self.outFile)) :
            if os.path.exists(os.path.join(self.wordDir, "{}_backup".format(self.outFile))):
                os.remove(os.path.join(self.wordDir, "{}_backup".format(self.outFile)))
            shutil.copy(os.path.join(self.wordDir, self.outFile), os.path.join(self.wordDir, "{}_backup".format(self.outFile)))
        
        with open(os.path.join(self.wordDir, self.outFile), 'wb') as f:
            pickle.dump(self.resDict, f)

Then I add three printfunction :

                print("getting res of something")
                (word, r) = i.result()
                print("finishing i.result")
                self.resDict[word] = r
                print("finished getting res of {}".format(word))

Here is the log:

getting res of something
finishing i.result
finished getting res of CNICnanotubesmolten
getting res of something
finishing i.result
finished getting res of CNN0
getting res of something
message by warpper
message by warpper
message by warpper
message by warpper
message by warpper

The log "message by warpper" can be printed at most once every time the warpper is called


Solution

    1. Yes

    2. Yes, as processes are submitted asynchronously. Also p1 (or other) will take another chunk of data if the size of the input iterable is larger than the max number of processes/workers

    3. "... how to get other results in advance"
      One of the convenient options is to rely on concurrent.futures.as_completed which will return the results as they are completed:


    import time
    import concurrent.futures
    
    
    def func(x):
        time.sleep(3)
        return x ** 2
    
    
    if __name__ == '__main__':
        data = range(1, 5)
        results = []
    
        with concurrent.futures.ProcessPoolExecutor(4) as ex:
            futures = [ex.submit(func, i) for i in data]
            # processing the earlier results: as they are completed
            for fut in concurrent.futures.as_completed(futures):
                res = fut.result()
                results.append(res)
                print(res)
    

    Sample output:

    4
    1
    9
    16
    

    Another option is to use callback on apply_async(func[, args[, kwds[, callback[, error_callback]]]]) call; the callback accepts only single argument as the returned result of the function. In that callback you can process the result in minimal way (considering that it's tied to only a single argument/result from a concrete function). The general scheme looks as follows:

    def res_callback(v):
        # ... processing result
        with open('test.txt', 'a') as f:  # just an example
            f.write(str(v))
        print(v, flush=True)
    
    
    if __name__ == '__main__':
        data = range(1, 5)
        results = []
        with Pool(4) as pool:
            tasks = [pool.apply_async(func, (i,), callback=res_callback) for i in data]
            # await for tasks finished
    

    But that schema would still require to somehow await (get() results) for submitted tasks.