pythonpython-3.xmultiprocessingmultiprocess

use multiprocess.Pool.map in a class


from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        self.count += i
        return self.count

a = Acc()
a.multiprocess()
print(a.count)

I suppose the output should be 30, but it is 0. I don't know how multiprocess.Pool.map works and how it cooperate with a class. Please tell me in detail.

By the way, if I print self.count inside like

    def run(self, i):
        print(self.count)
        self.count += i
        return self.count

It gives

0
1
0
1
00

1
10

1
00

11

0
1
00

1001



11

0
10

10

1

More confusing, why there have mixing 0 and 1.


Solution

  • First let's have the printout be a bit more orderly by adding flush=True to the print statement so that each print output occupies its own line:

    from multiprocessing import Pool
    
    class Acc:
        def __init__(self):
            self.count = 0
    
        def multiprocess(self):
            pool = Pool(processes=4)
            result = pool.map(self.run, [1]*30)
            pool.close()
            pool.join()
    
        def run(self, i):
            print('i =', self.count, flush=True)
            self.count += i
            return self.count
    
    if __name__ == '__main__':
        a = Acc()
        a.multiprocess()
        print('a.count =', a.count)
    

    Prints:

    i = 0
    i = 1
    i = 0
    i = 1
    i = 0
    i = 1
    i = 0
    i = 0
    i = 1
    i = 0
    i = 1
    i = 0
    i = 1
    i = 0
    i = 0
    i = 1
    i = 0
    i = 1
    i = 1
    i = 0
    i = 1
    i = 0
    i = 0
    i = 1
    i = 0
    i = 1
    i = 1
    i = 0
    i = 1
    i = 1
    a.count = 0
    

    Analysis

    Now let's analyze what is happening. The creation of a = Acc() is done by the main process. The multiprocessing pool processes are executing is a different address space so when they execute your worker function, self.run, object a must be serialized/de-serialized to the address space of the process that will be executing the worker function. In that new address space self.count comes across with the initial value of 0, which is printed, and then is incremented to 1 and returned. Meanwhile, in parallel, object a is being serialized/de-serialized 3 more times so 3 other processes can do the same processing and they, too, will also print 0 and return the value 1. But since all this incrementing is occurring to the copies of a that exist in address spaces other than the main process's address space, the original a in the main process remains unmodified. So as the map function continues to execute and a is further copied from the main process to the processing pool, it is always with self.count = 0.

    Then the question becomes why is i = 1 instead of i = 0 sometimes being printed?

    When you execute map with an iterable specifying 30 elements as are doing here, by default these 30 tasks are divided into "chunks" based on the chunksize argument that you provide. Since we took the default chunksize=None, the map function computes a default chunksize value based on the length of the iterable and the pool size:

    chunksize, remainder = divmod(len(iterable), 4 * pool_size)
    if remainder:
        chunksize += 1
    

    In this the pool size was 4 and so the chunksize would have been computed to be 2. That means that each process in the multiprocessing pool are taking tasks of the task queue two at a time and so they are processing the same object twice with different values of i (which is ignored).

    If we specify a chunksize of 1, so that each process only processes the object one at a time, then we have:

    from multiprocessing import Pool
    
    class Acc:
        def __init__(self):
            self.count = 0
    
        def multiprocess(self):
            pool = Pool(processes=4)
            result = pool.map(self.run, [1]*30, chunksize=1)
            pool.close()
            pool.join()
    
        def run(self, i):
            print('i =', self.count, flush=True)
            self.count += i
            return self.count
    
    if __name__ == '__main__':
        a = Acc()
        a.multiprocess()
        print('a.count =', a.count)
    

    Prints;

    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    i = 0
    a.count = 0
    

    And if we specify a chunksize of 30 so that a single process is processing all of the tasks against a single object:

    from multiprocessing import Pool
    
    class Acc:
        def __init__(self):
            self.count = 0
    
        def multiprocess(self):
            pool = Pool(processes=4)
            result = pool.map(self.run, [1]*30, chunksize=30)
            pool.close()
            pool.join()
    
        def run(self, i):
            print('i =', self.count, flush=True)
            self.count += i
            return self.count
    
    if __name__ == '__main__':
        a = Acc()
        a.multiprocess()
        print('a.count =', a.count)
    

    Prints:

    i = 0
    i = 1
    i = 2
    i = 3
    i = 4
    i = 5
    i = 6
    i = 7
    i = 8
    i = 9
    i = 10
    i = 11
    i = 12
    i = 13
    i = 14
    i = 15
    i = 16
    i = 17
    i = 18
    i = 19
    i = 20
    i = 21
    i = 22
    i = 23
    i = 24
    i = 25
    i = 26
    i = 27
    i = 28
    i = 29
    a.count = 0
    

    In this last case, of course, no multiprocessing occurred since a single process of the multiprocessing pool processed all the submitted tasks.