I'm trying to scale my computations from local Dask Arrays to Dask Distributed. Unfortunately, I am new to distributed computed, so I could not adapt the answer here for my purpose.
Mainly my problem is saving data from distributed computations back to an in-memory Zarr array while using Dask chaching and graph optimization process to get some statistics of the array back as well.
I want to preprocess some data, including subtracting one reference row and the row-wise mean. Doing this looks straightforward only with Dask arrays:
# create toy data
zarry = zarr.open('./example.zarr', mode='w')
sample_data = zarry.create_group("sample_data", overwrite=True)
s1 = sample_data.create_dataset('sample1', shape=(5, 1_000), chunks=(5, 100), dtype=int)
s1[:,:] = np.random.randint(0, 100, (5, 1_000))
data = da.from_zarr(s1)
analysed = zarry.create_group("analysed_data", overwrite=True)
r1 = analysed.create_dataset('analysed1', shape=(5, 1_000), chunks=(5, 100), dtype=float)
# computations
data -= (data[2] + 1e-15) # add epsilon to avoid division by 0
data -= data.mean(axis=1, keepdims=True) # center data
corrcoef_ = da.corrcoef(rec)
std_ = rec.std(axis=1)
# code in question
pp_rec = da.to_zarr(rec, r1, compute=False)
_, corrcoef_, std_ = da.compute(pp_rec, corrcoef_, std_, optimized_graph=True)
RuntimeError: Cannot store into in memory Zarr Array using the Distributed Scheduler.
The above code Produces an RunTimeErrer: Cannot store into in memory Zarr Array using the Distributed Scheduler.
.
RuntimeError Traceback (most recent call last)
Input In [7], in <cell line: 9>()
6 corrcoef_ = da.corrcoef(rec)
7 std_ = rec.std(axis=1)
----> 9 pp_rec = da.to_zarr(rec, r1, lock=True, compute=True, return_stored=False)
File ~/opt/anaconda3/envs/sept22/lib/python3.10/site-packages/dask/array/core.py:3540, in to_zarr(arr, url, component, storage_options, overwrite, region, compute, return_stored, **kwargs)
3536 z = url
3537 if isinstance(z.store, (dict, MutableMapping)) and config.get(
3538 "scheduler", ""
3539 ) in ("dask.distributed", "distributed"):
-> 3540 raise RuntimeError(
3541 "Cannot store into in memory Zarr Array using "
3542 "the Distributed Scheduler."
3543 )
3545 if region is None:
3546 arr = arr.rechunk(z.chunks)
RuntimeError: Cannot store into in memory Zarr Array using the Distributed Scheduler.
Starting from this code, I am failing to translate it into a distributed environment since I can not use da.to_zarr
directly. Is there any Trick I am not aware of?
What works is to get three individual Futures. However, I was wondering if any optimizations are still existing since relevant intermediate results for the std and corrcoef can be calculated on the fly, when subtracting etc.
cluster = LocalCluster(memory_limit="4GiB", n_workers=4) # local test cluster
client = Client(cluster)
# code in question
f_cc, f_std, f_rec = client.compute((corrcoef_, std_, rec), optimize_graph=True)
r1[:] = client.result(rec)
corrcoef_ = client.result(f_cc)
std_ = client.result(f_std)
Is there a better way to do this? Mainly not calculating the standard deviation and the correlation coefficient from scratch?
I really appreciate any help you can provide.
In case anyone ever stumbles over the same problem. The solution is quite simple, but took me some time to understand.
The key is to use a Zarr-Storage option that allows multi-processing and not just multi-threading. Since the in-memory storage option is only thread-safe, it will cause some errors as soon as you use Dask.Distributed with multiple processes (see above).
By changing the zarr storage to a DirectoryStore everything works fine. A possible solution is:
import dask.array as da
import dask.distributed as dd
import zarr
with dd.LocalCluster() as cluster:
with dd.Client(cluster) as client:
test_array = da.random.random((100, 100))
zstore = zarr.DirectoryStore('./example.zarr')
test_array.to_zarr(zstore)