If I have this:
import concurrent.futures
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x['processVal']}2"
def b(x):
# x is return of SetParameters()
return f"3{x['processVal']}4"
def c(myDict):
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["future"] = exe.submit(keyDict["fn"], myDict)
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["futureResult"] = keyDict["future"].result()
def main():
data = SetParameters()
c(data)
for key in data["keys"]:
print(data["keys"][key]["futureResult"])
if __name__ == "__main__":
main()
When I execute this code, I have a runaway forking issue to troubleshoot, but the request in this thread is understanding this error and what to do about it:
TypeError: cannot pickle '_thread.RLock' object
I suspect it's the data structure returned by SetParameters
, but only starting from the second pass through the first for
loop. My instinct is that a futures
object is non-picklable (hence the issue only appears on the second pass), but how do I then return the function results into myDict[keys][key<n>][futuresResult]
, without having special knowledge in advance how many key<n>
objects there are?
You are right about your guess about what's wrong. You are saving a future
object into the dictionary, and then passing that dictionary to your parallelized functions, which requires arguments to be pickable. I'm assuming you don't need the results in the functions. So without knowing exactly how you'll use it, there are a couple of options.
import concurrent.futures
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x}2"
def b(x):
# x is return of SetParameters()
return f"3{x}4"
def c(myDict):
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["future"] = exe.submit(keyDict["fn"], myDict['processVal'])
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["futureResult"] = keyDict["future"].result()
def main():
data = SetParameters()
c(data)
for key in data["keys"]:
print(data["keys"][key]["futureResult"])
if __name__ == "__main__":
main()
import concurrent.futures
import copy
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x['processVal']}2"
def b(x):
# x is return of SetParameters()
return f"3{x['processVal']}4"
def c(myDict):
myCopy = copy.deepcopy(myDict)
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["future"] = exe.submit(keyDict["fn"], myCopy)
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["futureResult"] = keyDict["future"].result()
def main():
data = SetParameters()
c(data)
for key in data["keys"]:
print(data["keys"][key]["futureResult"])
if __name__ == "__main__":
main()
import concurrent.futures
from collections import defaultdict
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x['processVal']}2"
def b(x):
# x is return of SetParameters()
return f"3{x['processVal']}4"
def c(myDict):
myResults = defaultdict(dict)
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
myResults[key]["future"] = exe.submit(myDict['keys'][key]["fn"], myDict)
for key in myDict["keys"]:
myResults[key]["futureResult"] = myResults[key]["future"].result()
return myResults
def main():
data = SetParameters()
res = c(data)
for result in res.values():
print(result["futureResult"])
if __name__ == "__main__":
main()
I'm sure there's more. But those are some general work-arounds depending on what you are trying to achieve.