pythonpicklepython-multiprocessing

How do I avoid pickling error whilst defining a function within a function?


I am trying to create a list of functions that act as controllers for a machine learning project.

Each function in the list is created at instantiation with its own parameters, and then each one has to be run an arbitrary number of times with (an) argument(s) to evaluate them. Things are slow so I have to parallelize it, but I'm having pickling issues.

I have reduced the script to this

import multiprocessing as mp

class task():
    def parallelizationWrap(self):
        poolSize = 5
        with mp.Pool(poolSize) as pool:
            for _ in pool.imap(self.parallelizationFunc, range(poolSize)):
                pass    

    def serialWrap(self):
        for _ in range(5):
            self.parallelizationFunc()

    def setup(self, unusedVar=None):
        vallist = [1,2,3,4,5]
        self.funclist = []
        for i in range(5):
            def tempfunc(argument, parameter=vallist[i]):
                print(parameter*argument)
            self.funclist.append(tempfunc)

    def parallelizationFunc(self, unuserVar=None):
        for step in range(25):
            for j in range(5):
                result = self.funclist[j](step)
                simulation.sendSignalToCorrectAgent(result)

if __name__ == "__main__":
    mp.freeze_support()

    c1 = task()
    c1.setup()
    c1.parallelizationWrap()
    # c1.serialWrap()

which gives me the error code

AttributeError: Can't pickle local object 'task.setup.<locals>.tempfunc'

I have considered just saving the parameters instead of the function directly, but that doesn't give me the flexibility I need, since the function can change.

I have also tried making it global as recommended here, but that gave me a similar pickling issue: _pickle.PicklingError: Can't pickle <function tempfunc at 0x000001861D0A3E20>: it's not the same object as __main__.tempfunc


Solution

  • I can smuggle the function in as a string using the exec function later, bypassing multiprocessings issues:

    Replace task.setup and task.parallelizationFunc, defining the tempFunc functions as strings and extracting them using Python 3's strange exec function, as such:

    class taskWithFunctioningParallelization(task):
        def setup(self, unusedVar=None):
            vallist = [1,2,3,4,5]
            self.funclist = []
            for i in range(5):
                funcString = f"""def tempFunc(argument, parameter={vallist[i]}):
        print(parameter*argument)"""
                self.funclist.append(funcString)
    
        def parallelizationFunc(self, unusedVar=None):
            for step in range(10):
                for j in range(5):
                    funcString = self.funclist[j]
                    d = {}
                    exec(funcString, d)
                    controllerFunc = d[list(d.keys())[1]]
                    controllerFunc(step)