pythondaskpython-xarraynetcdf

Error when using xarray.apply_ufunc on a chunked xarray DataArray


Hello I am trying to sort the data inside some netCDF4 (.nc) files into bins as efficiently as possible. I am currently trying this with xarray and NumPy's digitize function. Since I want to process a large amount of files I am using xarray.open_mfdataset, but this seems to cause an error when I apply the digitize function to it using xarray.apply_ufunc. My code is below:

import numpy as np
import xarray as xr

precip_bins = np.arange(0, 120, .1) # mm/hr, 0.1mm/hr bins
ds = xr.open_mfdataset(['/home/data/2022_rain.nc',
                        '/home/data/2023_rain.nc'],
                       chunks={'time': -1, 'latitude': 500, 'longitude': 700})
hourly_precip_ds = ds['precipitation']


print(hourly_precip_ds)

# Digitize the data into bins
bin_idxs = xr.apply_ufunc(np.digitize,
                          hourly_precip_ds,
                          precip_bins,
                          input_core_dims=[['time'], []],
                          output_core_dims=[['time']],
                          dask="parallelized",
                          dask_gufunc_kwargs={'allow_rechunk': True}) - 1  # Subtract 1 to make bins 0-indexed

Output:

<xarray.DataArray 'precipitation' (time: 17481, latitude: 500, longitude: 700)> Size: 24GB
dask.array<concatenate, shape=(17481, 500, 700), dtype=float32, chunksize=(8760, 500, 700), chunktype=numpy.ndarray>
Coordinates:
  * time       (time) datetime64[ns] 140kB 2022-01-01 ... 2023-12-31T23:00:00
  * latitude   (latitude) float32 2kB 69.95 69.85 69.75 ... 20.25 20.15 20.05
  * longitude  (longitude) float32 3kB -24.95 -24.85 -24.75 ... 44.85 44.95
Attributes:
    units:    mm/hr

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[9], line 2
      1 # Digitize the data into bins
----> 2 bin_idxs = xr.apply_ufunc(np.digitize,
      3                           hourly_precip_ds,
      4                           precip_bins,
      5                           input_core_dims=[['time'], []],
      6                           output_core_dims=[['time']],
      7                           dask="parallelized",
      8                           dask_gufunc_kwargs={'allow_rechunk': True}) - 1  # Subtract 1 to make bins 0-indexed
      9 print("++++++++++++++++++++++++++++++++++++++++++++++++")
     10 print(bin_idxs)

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:1278, in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, on_missing_core_dim, *args)
   1276 # feed DataArray apply_variable_ufunc through apply_dataarray_vfunc
   1277 elif any(isinstance(a, DataArray) for a in args):
-> 1278     return apply_dataarray_vfunc(
   1279         variables_vfunc,
   1280         *args,
   1281         signature=signature,
   1282         join=join,
   1283         exclude_dims=exclude_dims,
   1284         keep_attrs=keep_attrs,
   1285     )
   1286 # feed Variables directly through apply_variable_ufunc
   1287 elif any(isinstance(a, Variable) for a in args):

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:320, in apply_dataarray_vfunc(func, signature, join, exclude_dims, keep_attrs, *args)
    315 result_coords, result_indexes = build_output_coords_and_indexes(
    316     args, signature, exclude_dims, combine_attrs=keep_attrs
    317 )
    319 data_vars = [getattr(a, "variable", a) for a in args]
--> 320 result_var = func(*data_vars)
    322 out: tuple[DataArray, ...] | DataArray
    323 if signature.num_outputs > 1:

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:831, in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, vectorize, keep_attrs, dask_gufunc_kwargs, *args)
    826     if vectorize:
    827         func = _vectorize(
    828             func, signature, output_dtypes=output_dtypes, exclude_dims=exclude_dims
    829         )
--> 831 result_data = func(*input_data)
    833 if signature.num_outputs == 1:
    834     result_data = (result_data,)

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/core/computation.py:808, in apply_variable_ufunc.<locals>.func(*arrays)
    807 def func(*arrays):
--> 808     res = chunkmanager.apply_gufunc(
    809         numpy_func,
    810         signature.to_gufunc_string(exclude_dims),
    811         *arrays,
    812         vectorize=vectorize,
    813         output_dtypes=output_dtypes,
    814         **dask_gufunc_kwargs,
    815     )
    817     return res

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/xarray/namedarray/daskmanager.py:155, in DaskManager.apply_gufunc(self, func, signature, axes, axis, keepdims, output_dtypes, output_sizes, vectorize, allow_rechunk, meta, *args, **kwargs)
    138 def apply_gufunc(
    139     self,
    140     func: Callable[..., Any],
   (...)
    151     **kwargs: Any,
    152 ) -> Any:
    153     from dask.array.gufunc import apply_gufunc
--> 155     return apply_gufunc(
    156         func,
    157         signature,
    158         *args,
    159         axes=axes,
    160         axis=axis,
    161         keepdims=keepdims,
    162         output_dtypes=output_dtypes,
    163         output_sizes=output_sizes,
    164         vectorize=vectorize,
    165         allow_rechunk=allow_rechunk,
    166         meta=meta,
    167         **kwargs,
    168     )

File ~/.conda/envs/geo_jupyter_env/lib/python3.12/site-packages/dask/array/gufunc.py:431, in apply_gufunc(func, signature, axes, axis, keepdims, output_dtypes, output_sizes, vectorize, allow_rechunk, meta, *args, **kwargs)
    428 for dim, sizes in dimsizess.items():
    429     #### Check that the arrays have same length for same dimensions or dimension `1`
    430     if set(sizes) | {1} != {1, max(sizes)}:
--> 431         raise ValueError(f"Dimension `'{dim}'` with different lengths in arrays")
    432     if not allow_rechunk:
    433         chunksizes = chunksizess[dim]

ValueError: Dimension `'__loopdim1__'` with different lengths in arrays

Solution

  • I am not 100% certain why the error appears but my guess is that the function believes you are trying to broadcast the ufunc over both the hourly_precip_ds xarray and the precip_bins array.

    Moving the precip_bins array into the kwargs parameter of apply_ufunc seems to fix the error.

    bin_idxs = xr.apply_ufunc(
      np.digitize,
      hourly_precip_ds,
      input_core_dims=[['time']],
      output_core_dims=[['time']],
      kwargs={"bins": precip_bins},
      dask="parallelized",
      dask_gufunc_kwargs={'allow_rechunk': True}
    ) - 1  # Subtract 1 to make bins 0-indexed