pythondaskdask-distributedzarr

How to store data from dask.distributed on disk?


I'm trying to scale my computations from local Dask Arrays to Dask Distributed. Unfortunately, I am new to distributed computed, so I could not adapt the answer here for my purpose.

Mainly my problem is saving data from distributed computations back to an in-memory Zarr array while using Dask chaching and graph optimization process to get some statistics of the array back as well.

I want to preprocess some data, including subtracting one reference row and the row-wise mean. Doing this looks straightforward only with Dask arrays:

# create toy data
zarry = zarr.open('./example.zarr', mode='w')
sample_data = zarry.create_group("sample_data", overwrite=True)
s1 = sample_data.create_dataset('sample1', shape=(5, 1_000), chunks=(5, 100), dtype=int)
s1[:,:] = np.random.randint(0, 100, (5, 1_000))
data = da.from_zarr(s1)
analysed = zarry.create_group("analysed_data", overwrite=True)
r1 = analysed.create_dataset('analysed1', shape=(5, 1_000), chunks=(5, 100), dtype=float)

# computations
data -= (data[2] + 1e-15)  # add epsilon to avoid division by 0
data -= data.mean(axis=1, keepdims=True)  # center data
corrcoef_ = da.corrcoef(rec)
std_ = rec.std(axis=1)

# code in question
pp_rec = da.to_zarr(rec, r1, compute=False)

_, corrcoef_, std_ = da.compute(pp_rec, corrcoef_, std_, optimized_graph=True)

RuntimeError: Cannot store into in memory Zarr Array using the Distributed Scheduler.

The above code Produces an RunTimeErrer: Cannot store into in memory Zarr Array using the Distributed Scheduler..

RuntimeError          Traceback (most recent call last)
Input In [7], in <cell line: 9>()
      6 corrcoef_ = da.corrcoef(rec)
      7 std_ = rec.std(axis=1)
----> 9 pp_rec = da.to_zarr(rec, r1, lock=True, compute=True, return_stored=False)

File ~/opt/anaconda3/envs/sept22/lib/python3.10/site-packages/dask/array/core.py:3540, in to_zarr(arr, url, component, storage_options, overwrite, region, compute, return_stored, **kwargs)
   3536 z = url
   3537 if isinstance(z.store, (dict, MutableMapping)) and config.get(
   3538     "scheduler", ""
   3539 ) in ("dask.distributed", "distributed"):
-> 3540     raise RuntimeError(
   3541         "Cannot store into in memory Zarr Array using "
   3542         "the Distributed Scheduler."
   3543     )
   3545 if region is None:
   3546     arr = arr.rechunk(z.chunks)

RuntimeError: Cannot store into in memory Zarr Array using the Distributed Scheduler.

Starting from this code, I am failing to translate it into a distributed environment since I can not use da.to_zarr directly. Is there any Trick I am not aware of?

What works is to get three individual Futures. However, I was wondering if any optimizations are still existing since relevant intermediate results for the std and corrcoef can be calculated on the fly, when subtracting etc.

cluster = LocalCluster(memory_limit="4GiB", n_workers=4)  # local test cluster
client = Client(cluster)

# code in question
f_cc, f_std, f_rec = client.compute((corrcoef_, std_, rec), optimize_graph=True)
r1[:] = client.result(rec)
corrcoef_ = client.result(f_cc)
std_ = client.result(f_std)

Is there a better way to do this? Mainly not calculating the standard deviation and the correlation coefficient from scratch?

I really appreciate any help you can provide.


Solution

  • In case anyone ever stumbles over the same problem. The solution is quite simple, but took me some time to understand.

    The key is to use a Zarr-Storage option that allows multi-processing and not just multi-threading. Since the in-memory storage option is only thread-safe, it will cause some errors as soon as you use Dask.Distributed with multiple processes (see above).

    By changing the zarr storage to a DirectoryStore everything works fine. A possible solution is:

    import dask.array as da
    import dask.distributed as dd
    import zarr
    
    with dd.LocalCluster() as cluster:
        with dd.Client(cluster) as client:
            test_array = da.random.random((100, 100))
            zstore = zarr.DirectoryStore('./example.zarr')
            test_array.to_zarr(zstore)