I am porting over a library for a client who is very picky about external dependencies.
The majority of the multiprocessing in this library is supported by the pathos ProcessPool module. The main reason being that it can very easily deal with locally defined functions.
I'm trying to get some of this functionality back without forcing this dependence (or having to rewrite large chunks of the library). I understand that the following code works because the function is defined at the top level:
import multiprocessing as mp
def f(x):
return x * x
def main():
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
The following code (which is what I need to get working) fails as the function is only defined in the local scope:
import multiprocessing as mp
def main():
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
In my case (not the toy example above) the objects in my generator are unmarshallable.
What would be a good workaround for this specific use case which doesn't require external dependencies?
the main problem is the closure variables.
if you don't have those it can be done like this:
import marshal
import multiprocessing
import types
from functools import partial
def main():
def internal_func(c):
return c*c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))
def internal_func_map(pool, f, gen):
marshaled = marshal.dumps(f.__code__)
return pool.map(partial(run_func, marshaled=marshaled), gen)
def run_func(*args, **kwargs):
marshaled = kwargs.pop("marshaled")
func = marshal.loads(marshaled)
restored_f = types.FunctionType(func, globals())
return restored_f(*args, **kwargs)
if __name__ == "__main__":
main()
the idea is that the function code has everything you need in order to run it in a new process. notice that no external dependencies are needed, just regular python libraries.
If closures are indeed needed, then the most difficult part about this solution is actually creating them. (in closure there is something called a "cell" which is not very easy to create by code...)
Here is the somewhat elaborate working code:
import marshal
import multiprocessing
import pickle
import types
from functools import partial
class A:
def __init__(self, a):
self.a = a
def main():
x = A(1)
def internal_func(c):
return x.a + c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))
def internal_func_map(pool, f, gen):
closure = f.__closure__
marshaled_func = marshal.dumps(f.__code__)
pickled_closure = pickle.dumps(tuple(x.cell_contents for x in closure))
return pool.map(partial(run_func, marshaled_func=marshaled_func, pickled_closure=pickled_closure), gen)
def run_func(*args, **kwargs):
marshaled_func = kwargs.pop("marshaled_func")
func = marshal.loads(marshaled_func)
pickled_closure = kwargs.pop("pickled_closure")
closure = pickle.loads(pickled_closure)
restored_f = types.FunctionType(func, globals(), closure=create_closure(func, closure))
return restored_f(*args, **kwargs)
def create_closure(func, original_closure):
indent = " " * 4
closure_vars_def = f"\n{indent}".join(f"{name}=None" for name in func.co_freevars)
closure_vars_ref = ",".join(func.co_freevars)
dynamic_closure = "create_dynamic_closure"
s = (f"""
def {dynamic_closure}():
{closure_vars_def}
def internal():
{closure_vars_ref}
return internal.__closure__
""")
exec(s)
created_closure = locals()[dynamic_closure]()
for closure_var, value in zip(created_closure, original_closure):
closure_var.cell_contents = value
return created_closure
if __name__ == "__main__":
main()
Hope that helps or at least gives you some ideas on how to tackle this problem!