I am trying to execute a cython file in parallel. The skeleton of my code is:
def build_DF_single(lo, hi, map[long, set[string]] authors_id_map, map[long, set[string]] authors_org_map,
map[long, set[string]] fos_name_map, map[long, set[string]] publisher_map,
map[long, set[string]] year_map, map[long, set[long]] reference_map,
map[string, double] authors_id_prob, map[string, double] authors_org_prob,
map[string, double] fos_name_prob, map[string, double] publisher_prob,
map[string, double] year_prob, map[string, set[long]] authors_id_co,
map[string, set[long]] authors_org_co, map[string, set[long]] fos_name_co,
map[string, set[long]] publisher_co, map[string, set[long]] year_co,
map[long, vector[double]] doc2vec_map):
for i in tqdm(range(lo, hi)):
line = lines[i]
# Data cleaning on <line>
def mmap(name):
d = joblib.load("mmap/" + name + ".mmap", mmap_mode="r")
gc.collect()
return d
authors_id_prob = mmap("authors_id_prob")
authors_org_prob = mmap("authors_org_prob")
fos_name_prob = mmap("fos_name_prob")
publisher_prob = mmap("publisher_prob")
year_prob = mmap("year_prob")
authors_id_co = mmap("authors_id_co")
authors_org_co = mmap("authors_org_co")
fos_name_co = mmap("fos_name_co")
publisher_co = mmap("publisher_co")
year_co = mmap("year_co")
doc2vec_map = mmap("doc2vec_map")
with open("file", "r") as f:
lines = f.readlines() # Pretty large as well
batch_size = int(math.ceil(len(lines) / n_cpu))
results = Parallel(n_jobs = n_cpu, prefer="threads", max_nbytes=None)(delayed(build_DF_single)(
(i * batch_size), min((i + 1) * batch_size, len(lines)),
authors_id_map, authors_org_map, fos_name_map, publisher_map, year_map, reference_map, authors_id_prob, authors_org_prob, fos_name_prob, publisher_prob, year_prob, authors_id_co, authors_org_co, fos_name_co, publisher_co, year_co, doc2vec_map
) for i in range(n_cpu))
Where authors_id_map, authors_org_map, fos_name_map, publisher_map, year_map, reference_map, authors_id_prob, authors_org_prob, fos_name_prob, publisher_prob, year_prob, authors_id_co, authors_org_co, fos_name_co, publisher_co, year_co, doc2vec_map are all very large c++ maps. Since I don't want to fork them to different processes, I make them memory maps instead. However, I end up getting the following error when my code gets to the Parallel() part:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "buildDF.pyx", line 473, in buildDF.build_DF
results = Parallel(n_jobs = n_cpu, require="sharedmem", prefer="threads", max_nbytes=None)(delayed(build_DF_single)(
File "/home/zhangji/.local/lib/python2.7/site-packages/joblib/parallel.py", line 1004, in __call__
if self.dispatch_one_batch(iterator):
File "/home/zhangji/.local/lib/python2.7/site-packages/joblib/parallel.py", line 808, in dispatch_one_batch
islice = list(itertools.islice(iterator, big_batch_size))
File "buildDF.pyx", line 475, in genexpr
authors_id_map, authors_org_map, fos_name_map, publisher_map, year_map, reference_map, authors_id_prob, authors_org_prob, fos_name_prob, publisher_prob, year_prob, authors_id_co, authors_org_co, fos_name_co, publisher_co, year_co, doc2vec_map
File "stringsource", line 207, in map.to_py.__pyx_convert_map_to_py_std_3a__3a_string____double
MemoryError
Can anyone tell me what is going on? What is "stringsource"?
Thanks!
To follow up on my comments:
The reference to stringsource
is slightly confusing. It refers to utility code written in Cython which is generated internally by Cython. In this case the utility code is called to convert your Python map-like types to C++ std::map<std::string, double>
(__pyx_convert_map_to_py_std_3a__3a_string____double
).
The basic problem looks to be that you are trying to do two mutually contradictory things:
You are using a Python wrapping of mmap
to load a set of large files without needing to load them into memory all at once.
You are converting all you data to C++ std::map
, presumably because you hope it will be faster or that it can be run without the GIL.
The conversion to std::map
is not a "transparent wrapper" - it makes a completely new copy. Therefore all the data is loaded out of your mmap
and directly into memory - hence the MemoryError
(I think - it's difficult to be definite when you have not provided a minimal reproducible example).
There is no obvious solution. Either you need to stick to using Python objects so that your mmap
can be preserved or you need to implement a C++ class yourself that can load the data mmap
-style to avoid the conversion to std::map
.