I have data from a netCDF source (ECMWF ERA5) which I read using xarray. It has four dimensions (say x, y, z and t) and three variables (say r, h and g).
I have to write it to plain text files in directories defined by three of the dimensions extracting all data corresponding to these variable values for all values of the last variable.
A serial version of this code works just fine. A simplified representation is:
import xarray as xr
def extract_data(data,z,x,y,t):
out_str = ''
for t_val in t:
for z_val in z:
out_str += data.sel(x,y,t_val,z_val).data
return(out_str)
def data_get_write(dir,r,h,g,x,y,z,t):
r_str = extract_data(r,x,y,z,t)
with open(dir+'r_file', 'w') as f_r:
f_r.write(r_str)
h_str = extract_data(h,x,y,z,t)
with open(dir+'h_file', 'w') as f_h:
f_h.write(h_str)
g_str = extract_data(g,x,y,z,t)
with open(dir+'g_file', 'w') as f_g:
f_g.write(g_str)
return(<operation>)
if __name__ == '__main__':
ds=xr.open_dataset('data.nc')
r=ds['r']
h=ds['h']
g=ds['g']
z=ds['z']
t=ds['t']
dir_list=[]
x_list=[]
y_list=[]
for val_x in x:
for val_y in y:
dir=(<operation>)
dir_list.append(dir)
x_list.append(x)
y_list.append(y)
list_len=len(dir_list)
r_list=[r]*list_len
h_list=[h]*list_len
g_list=[g]*list_len
z_list=[z]*list_len
t_list=[t]*list_len
results = map(data_get_write, r_list, h_list, g_list, x_list, y_list, z_list, t_list)
I have written it this way with the map function call hoping to parallelize the execution using concurrent.futures.ThreadPoolExecutor
, replacing the map call thus:
max_workers = min(32, int(arguments["-n"]) + 4)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(data_get_write, r_list, h_list, g_list, x_list, y_list, z_list, t_list)
However, this code runs actually slower than the serialised version.
On my test dataset and machine (a small 600Mb sample file on a small server), the serialized version takes roughly 2'40", the parallelized version 6'30". Changing the number of cores (from 1 to 20) changes nothing significant for either version.
I have specifically chosen the ThreadPoolExecutor as the dataset is likely to be very large in real world cases (several Gb) and passing it by value to that many subprocesses I fear might cause an OOM error in some cases. My test dataset only has around 20 values for the x and y dimensions and 6 for t. Real world cases run up to a thousand t values and around a hundred x and y, so around four to five orders of magnitude greater.
Either I am doing something very wrong (entirely possible, this is my first code reading netCDF and my first attempt at parallelization) which is slowing down the parallel execution or concurrent.futures.ThreadPoolExecutor is not the way to go.
I have seen a similar question here, but there is no other suggestion than to use parallel, which is not an option in my case, my code runs on clusters where I am not at leisure to install parallel and the parallelization must be within my script and the libraries, which must be as standard as possible (hence using xarray rather than netCDF4).
I have started reading about asyncio but it is quite a bit different from concurrent.futures and looks like I will need to re-write quite a bit, I'd like to avoid going down another dead-end.
ThreadPoolExecutor is the problem. Probably as access to the identical objects (even if only being read) is locked by the GIL.
Substituting with ProcessPoolExecutor unlocks the situation. Speedup is obtained albeit at the cost of memory overhead linked to passing max_workers
times the dataset at any given time.