pythonpython-3.xmultiprocessingpython-multiprocessingprocess-pool

Multiprocess pool initialization with sequential initializer argument


I have some code like the following:

import multiprocessing as mp

connection: module.Connection

def client_id():
    for i in range(mp.cpu_count*2):
        yield i

def initproc(host: str, port: int, client_id: int):
    global connection
    connection.connect(host, port, client_id)

def main():
    host = "something"
    port = 12345
    mp.get_context("spawn").Pool(processes=mp.cpu_count()*2,
                                 initializer=initproc,
                                 initargs=(host, port, client_id())) as p:
        res = p.starmap(processing_function, arg_list)
    

for the purposes of the question processing_function and arg_list are not relevant.

The issue is that I get an error with this:

    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'generator' object

Is there any way to create an initialize a process in the pool in such a way that on of the arguments to initialize it would be the next number in a sequence?

P.S. In the code as written it may be possible to initialize all connection objects outside of the initializer function but in my particular instance it is not. I need to pass arguments for connection into the initializer.


Solution

  • A simple solution for your case would be to use the sequential number of the child-process which is contained in the Process.name. You could extract it with...

    mp.current_process().name.split('-')[1]
    

    If you need more control over where the sequence starts, you could use multiprocessing.Value as a counter from which workers get their unique number.

    import multiprocessing as mp
    import time
    
    
    def init_p(client_id):
        with client_id.get_lock():
            globals()['client_id'] = client_id.value
            print(f"{mp.current_process().name},"
                  f" {mp.current_process().name.split('-')[1]},"  # alternative
                  f" client_id:{globals()['client_id']}")
            client_id.value += 1
    
    
    if __name__ == "__main__":
    
        ctx = mp.get_context("spawn")
        client_ids = ctx.Value('i', 0)
    
        with ctx.Pool(
                processes=4,
                initializer=init_p,
                initargs=(client_ids,)
        ) as pool:
    
            time.sleep(3)
    

    Output:

    SpawnPoolWorker-2, 2, client_id:0
    SpawnPoolWorker-3, 3, client_id:1
    SpawnPoolWorker-1, 1, client_id:2
    SpawnPoolWorker-4, 4, client_id:3
    
    Process finished with exit code 0