I'm trying to speed up a CPU-bound Python script (on Windows11). Threats in Python do not seem to run on a different cpu(core) so the only option I have is multiprocessing.
I have a big dictionary data structure (11GB memory footprint after loading from file) that I am checking calculated values on if they are in that dictionary. Input for the calculation also comes from a file (100GB in size). This input I can pool-map to the processes in batches, no problem. But I cannot copy the dictionary to all processes because there is not enough memory for that. So I need to find a way for the processes to check if the value (actually a string) is in the dictionary.
Any advice?
Pseudo programm flow:
--main--
- load dictionary structure from file # 11GB memory footprint
- ...
- While not all chuncks loaded
- Load chunk of calcdata from file # (10.000 lines per chunk)
- Distribute (map) calcdata-chunck to processes
- Wait for processes to complete all chunks
--process--
- for each element in subchunk
- perform calculation
- check if calculation in dictionary # here is my problem!
- store result in file
Edit, after implementing comments below, I am now at:
def ReadDictFromFile()
cnt=0
print("Reading dictionary from " + dictfilename)
with open(dictfilename, encoding=("utf-8"), errors=("replace")) as f:
next(f) #skip first line (header)
for line in f:
s = line.rstrip("\n")
(key,keyvalue) = s.split()
shared_dict[str(key)]=keyvalue
cnt = cnt + 1
if ((cnt % 1000000) == 0): #log each 1000000 where we are
print(cnt)
return #temp to speed up testing, not load whole dictionary atm
print("Done loading dictionary")
def checkqlist(qlist)
print(str(os.getpid()) + "-" + str(len(qlist)))
for li in qlist:
try:
checkvalue = calculations(li)
(found, keyval) = InMem(checkvalue)
if (found):
print("FOUND!!! " + checkvalue + ' ' + keyvalue)
except Exception as e:
print("(" + str(os.getpid()) + ")Error log: %s" % repr(e))
time.sleep(15)
def InMem(checkvalue):
if(checkvalue in shared_dict):
return True, shared_dict[checkvalue]
else:
return False, ""
if __name__ == "__main__":
start_time = time.time()
global shared_dict
manager = Manager()
shared_dict = manager.dict()
ReadDictFromFile()
chunksize=5
nr_of_processes = 10
with open(filetocheck, encoding=("utf-8"), errors=("replace")) as f:
qlist = []
for line in f:
s = line.rstrip("\n")
qlist.append(s)
if (len(qlist) >= (chunksize * nr_of_processes)):
chunked_list = [qlist[i:i+chunk_size] for i in range(0, len(qlist), chunk_size)]
try:
with multiprocessing.Pool() as pool:
pool.map(checkqlist, chunked_list, nr_of_processes) #problem: qlist is a single string, not a list of about 416 strings.
except Exception as e:
print("error log: %s" % repr(e))
time.sleep(15)
logit("Completed! " + datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y"))
print("--- %s seconds ---" % (time.time() - start_time))
you can use a multiprocessing.Manager.dict for this, it's the fastest IPC you can use to do the check between processes in python, and for the memory size, just make it smaller by changing all values to None, on my pc it can do 33k member checks every second ... about 400 times slower than a normal dictionary.
manager = Manager()
shared_dict = manager.dict()
shared_dict.update({x:None for x in main_dictionary})
shared_dict["new_element"] = None # to set another value
del shared_dict["new_element"] # to delete a certain value
you can also use a dedicated in-memory database for this like redis, which can handle being polled by multiple processes at the same time.
@Sam Mason suggestion to use WSL and fork may be better, but this one is the most portable.
Edit: to store it in children global scope you have to pass it through the initializer.
def define_global(var):
global shared_dict
shared_dict = var
...
if __name__ == "__main__":
...
with multiprocessing.Pool(initializer=define_global, initargs=(shared_dict ,)) as pool:
Pool.map
has arguments (function, iterable, chunksize), you can leave the chunksize empty which has a good default, or set it to 1
if the tasks are big enough, you do NOT set it to any other value unless you clearly understand what it does, it is basically "tasks per worker"