Hello I am trying to sort the data inside some netCDF4 (.nc) files into bins as efficiently as possible. I am currently trying this with xarray and NumPy's digitize function. Since I want to process a large amount of files I am using xarray.open_mfdataset, but this seems to cause an error when I apply the digitize function to it using xarray.apply_ufunc. My code is below:
import numpy as np
import xarray as xr
precip_bins = np.arange(0, 120, .1) # mm/hr, 0.1mm/hr bins
ds = xr.open_mfdataset(['/home/data/2022_rain.nc',
'/home/data/2023_rain.nc'],
chunks={'time': -1, 'latitude': 500, 'longitude': 700})
hourly_precip_ds = ds['precipitation']
print(hourly_precip_ds)
# Digitize the data into bins
bin_idxs = xr.apply_ufunc(np.digitize,
hourly_precip_ds,
precip_bins,
input_core_dims=[['time'], []],
output_core_dims=[['time']],
dask="parallelized",
dask_gufunc_kwargs={'allow_rechunk': True}) - 1 # Subtract 1 to make bins 0-indexed
Output:
<xarray.DataArray 'precipitation' (time: 17481, latitude: 500, longitude: 700)> Size: 24GB
dask.array<concatenate, shape=(17481, 500, 700), dtype=float32, chunksize=(8760, 500, 700), chunktype=numpy.ndarray>
Coordinates:
* time (time) datetime64[ns] 140kB 2022-01-01 ... 2023-12-31T23:00:00
* latitude (latitude) float32 2kB 69.95 69.85 69.75 ... 20.25 20.15 20.05
* longitude (longitude) float32 3kB -24.95 -24.85 -24.75 ... 44.85 44.95
Attributes:
units: mm/hr
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[9], line 2
1 # Digitize the data into bins
----> 2 bin_idxs = xr.apply_ufunc(np.digitize,
3 hourly_precip_ds,
4 precip_bins,
5 input_core_dims=[['time'], []],
6 output_core_dims=[['time']],
7 dask="parallelized",
8 dask_gufunc_kwargs={'allow_rechunk': True}) - 1 # Subtract 1 to make bins 0-indexed
9 print("++++++++++++++++++++++++++++++++++++++++++++++++")
10 print(bin_idxs)
File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:1278, in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, on_missing_core_dim, *args)
1276 # feed DataArray apply_variable_ufunc through apply_dataarray_vfunc
1277 elif any(isinstance(a, DataArray) for a in args):
-> 1278 return apply_dataarray_vfunc(
1279 variables_vfunc,
1280 *args,
1281 signature=signature,
1282 join=join,
1283 exclude_dims=exclude_dims,
1284 keep_attrs=keep_attrs,
1285 )
1286 # feed Variables directly through apply_variable_ufunc
1287 elif any(isinstance(a, Variable) for a in args):
File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:320, in apply_dataarray_vfunc(func, signature, join, exclude_dims, keep_attrs, *args)
315 result_coords, result_indexes = build_output_coords_and_indexes(
316 args, signature, exclude_dims, combine_attrs=keep_attrs
317 )
319 data_vars = [getattr(a, "variable", a) for a in args]
--> 320 result_var = func(*data_vars)
322 out: tuple[DataArray, ...] | DataArray
323 if signature.num_outputs > 1:
File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:831, in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, vectorize, keep_attrs, dask_gufunc_kwargs, *args)
826 if vectorize:
827 func = _vectorize(
828 func, signature, output_dtypes=output_dtypes, exclude_dims=exclude_dims
829 )
--> 831 result_data = func(*input_data)
833 if signature.num_outputs == 1:
834 result_data = (result_data,)
File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:808, in apply_variable_ufunc.<locals>.func(*arrays)
807 def func(*arrays):
--> 808 res = chunkmanager.apply_gufunc(
809 numpy_func,
810 signature.to_gufunc_string(exclude_dims),
811 *arrays,
812 vectorize=vectorize,
813 output_dtypes=output_dtypes,
814 **dask_gufunc_kwargs,
815 )
817 return res
File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/namedarray/daskmanager.py:155, in DaskManager.apply_gufunc(self, func, signature, axes, axis, keepdims, output_dtypes, output_sizes, vectorize, allow_rechunk, meta, *args, **kwargs)
138 def apply_gufunc(
139 self,
140 func: Callable[..., Any],
(...)
151 **kwargs: Any,
152 ) -> Any:
153 from dask.array.gufunc import apply_gufunc
--> 155 return apply_gufunc(
156 func,
157 signature,
158 *args,
159 axes=axes,
160 axis=axis,
161 keepdims=keepdims,
162 output_dtypes=output_dtypes,
163 output_sizes=output_sizes,
164 vectorize=vectorize,
165 allow_rechunk=allow_rechunk,
166 meta=meta,
167 **kwargs,
168 )
File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/dask/array/gufunc.py:431, in apply_gufunc(func, signature, axes, axis, keepdims, output_dtypes, output_sizes, vectorize, allow_rechunk, meta, *args, **kwargs)
428 for dim, sizes in dimsizess.items():
429 #### Check that the arrays have same length for same dimensions or dimension `1`
430 if set(sizes) | {1} != {1, max(sizes)}:
--> 431 raise ValueError(f"Dimension `'{dim}'` with different lengths in arrays")
432 if not allow_rechunk:
433 chunksizes = chunksizess[dim]
ValueError: Dimension `'__loopdim1__'` with different lengths in arrays
I am not 100% certain why the error appears but my guess is that the function believes you are trying to broadcast the ufunc over both the hourly_precip_ds xarray and the precip_bins array.
Moving the precip_bins array into the kwargs parameter of apply_ufunc seems to fix the error.
bin_idxs = xr.apply_ufunc(
np.digitize,
hourly_precip_ds,
input_core_dims=[['time']],
output_core_dims=[['time']],
kwargs={"bins": precip_bins},
dask="parallelized",
dask_gufunc_kwargs={'allow_rechunk': True}
) - 1 # Subtract 1 to make bins 0-indexed