pythonconcurrent.futuresexecutorprocess-pool

Multiprocessing using Functions Held in Variables as the Called Function


If I have this:

import concurrent.futures

def SetParameters():
    return { "processVal" : 'q',
             "keys" : { "key1" : { "fn" : a,
                                   "future" : None,
                                   "futureResult" : None },
                        "key2" : { "fn" : b,
                                   "future" : None,
                                   "futureResult" : None }
                      }
           }

#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
    # x is return of SetParameters()
    return f"1{x['processVal']}2"

def b(x):
    # x is return of SetParameters()
    return f"3{x['processVal']}4"

def c(myDict):
    with concurrent.futures.ProcessPoolExecutor() as exe:
        for key in myDict["keys"]:
            keyDict = myDict["keys"][key]
            keyDict["future"] = exe.submit(keyDict["fn"], myDict)
        for key in myDict["keys"]:
            keyDict = myDict["keys"][key]
            keyDict["futureResult"] = keyDict["future"].result()

def main():
    data = SetParameters()
    c(data)
    for key in data["keys"]:
        print(data["keys"][key]["futureResult"])

if __name__ == "__main__":
    main()

When I execute this code, I have a runaway forking issue to troubleshoot, but the request in this thread is understanding this error and what to do about it:

TypeError: cannot pickle '_thread.RLock' object

I suspect it's the data structure returned by SetParameters, but only starting from the second pass through the first for loop. My instinct is that a futures object is non-picklable (hence the issue only appears on the second pass), but how do I then return the function results into myDict[keys][key<n>][futuresResult], without having special knowledge in advance how many key<n> objects there are?


Solution

  • You are right about your guess about what's wrong. You are saving a future object into the dictionary, and then passing that dictionary to your parallelized functions, which requires arguments to be pickable. I'm assuming you don't need the results in the functions. So without knowing exactly how you'll use it, there are a couple of options.

    1. Simply pass what you need into the functions.
    import concurrent.futures
    
    def SetParameters():
        return { "processVal" : 'q',
                 "keys" : { "key1" : { "fn" : a,
                                       "future" : None,
                                       "futureResult" : None },
                            "key2" : { "fn" : b,
                                       "future" : None,
                                       "futureResult" : None }
                          }
               }
    
    #in my target production code, a() and b() and around a dozen others will each
    #take a long while to complete and are IO-bound, hence the desire for
    #parallelization
    def a(x):
        # x is return of SetParameters()
        return f"1{x}2"
    
    def b(x):
        # x is return of SetParameters()
        return f"3{x}4"
    
    def c(myDict):
        with concurrent.futures.ProcessPoolExecutor() as exe:
            for key in myDict["keys"]:
                keyDict = myDict["keys"][key]
                keyDict["future"] = exe.submit(keyDict["fn"], myDict['processVal'])
            for key in myDict["keys"]:
                keyDict = myDict["keys"][key]
                keyDict["futureResult"] = keyDict["future"].result()
    
    def main():
        data = SetParameters()
        c(data)
        for key in data["keys"]:
            print(data["keys"][key]["futureResult"])
    
    if __name__ == "__main__":
        main()
    
    1. Make a deep copy of myDict, but know that no functions can use the results (nor would it look like you would want them to).
    import concurrent.futures
    import copy
    
    def SetParameters():
        return { "processVal" : 'q',
                 "keys" : { "key1" : { "fn" : a,
                                       "future" : None,
                                       "futureResult" : None },
                            "key2" : { "fn" : b,
                                       "future" : None,
                                       "futureResult" : None }
                          }
               }
    
    #in my target production code, a() and b() and around a dozen others will each
    #take a long while to complete and are IO-bound, hence the desire for
    #parallelization
    def a(x):
        # x is return of SetParameters()
        return f"1{x['processVal']}2"
    
    def b(x):
        # x is return of SetParameters()
        return f"3{x['processVal']}4"
    
    def c(myDict):
        myCopy = copy.deepcopy(myDict)
        with concurrent.futures.ProcessPoolExecutor() as exe:
            for key in myDict["keys"]:
                keyDict = myDict["keys"][key]
                keyDict["future"] = exe.submit(keyDict["fn"], myCopy)
            for key in myDict["keys"]:
                keyDict = myDict["keys"][key]
                keyDict["futureResult"] = keyDict["future"].result()
    
    def main():
        data = SetParameters()
        c(data)
        for key in data["keys"]:
            print(data["keys"][key]["futureResult"])
    
    if __name__ == "__main__":
        main()
    
    1. Save the results somewhere else and merge them later (or don't).
    import concurrent.futures
    from collections import defaultdict
    
    def SetParameters():
        return { "processVal" : 'q',
                 "keys" : { "key1" : { "fn" : a,
                                       "future" : None,
                                       "futureResult" : None },
                            "key2" : { "fn" : b,
                                       "future" : None,
                                       "futureResult" : None }
                          }
               }
    
    #in my target production code, a() and b() and around a dozen others will each
    #take a long while to complete and are IO-bound, hence the desire for
    #parallelization
    def a(x):
        # x is return of SetParameters()
        return f"1{x['processVal']}2"
    
    def b(x):
        # x is return of SetParameters()
        return f"3{x['processVal']}4"
    
    def c(myDict):
        myResults = defaultdict(dict)
        with concurrent.futures.ProcessPoolExecutor() as exe:
            for key in myDict["keys"]:
                myResults[key]["future"] = exe.submit(myDict['keys'][key]["fn"], myDict)
            for key in myDict["keys"]:
                myResults[key]["futureResult"] = myResults[key]["future"].result()
    
        return myResults
    
    def main():
        data = SetParameters()
        res = c(data)
        for result in res.values():
            print(result["futureResult"])
    
    if __name__ == "__main__":
        main()
    

    I'm sure there's more. But those are some general work-arounds depending on what you are trying to achieve.