pythondaskdask-distributeddask-dataframedask-delayed

Apply a function over the columns of a Dask array


What is the most efficient way to apply a function to each column of a Dask array? As documented below, I've tried a number of things but I still suspect that my use of Dask is rather amateurish.

I have a quite wide and quite long array, in the region of 3,000,000 x 10,000. I want to apply the ecdf function to each column of this array. The individual column results stacked together should result in an array with the same dimension as the input array.

Consider the following tests and let me know which approach is the ideal one or how I can improve. I know, I could just use the fastest one, but I really want to exploit the possibilities of Dask to the maximum. The arrays could also be multiple times bigger. At the same time, the results of my benchmarks are surprising for me. Maybe I don't understand the logic behind Dask correctly.

import numpy as np
import dask
import dask.array as da
from dask.distributed import Client, LocalCluster
from statsmodels.distributions.empirical_distribution import ECDF

### functions
def ecdf(x):
    fn = ECDF(x)
    return fn(x)

def ecdf_array(X):

    res = np.zeros_like(X)
    for i in range(X.shape[1]):
        res[:,i] = ecdf(X[:,i])
        
    return res

### set up scheduler / workers
n_workers = 10
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)
client = Client(cluster)

### create data set 
X = da.random.random((100000,100)) #dask
Xarr = X.compute() #numpy

### traditional for loop
%timeit -r 10 foo1 = ecdf_array(Xarr)

### adjusting chunk size to 2d-array and map_blocks
X = X.rechunk(chunks=(X.shape[0],np.ceil(X.shape[1]/n_workers)))
Xm = X.map_blocks(lambda x: ecdf_array(x),meta = np.array((), dtype='float'))
%timeit -r 10 foo2 = Xm.compute()

### adjusting chunk size to column size and map_blocks
X = X.rechunk(chunks=(X.shape[0],1))
Xm = X.map_blocks(lambda x: np.expand_dims(ecdf(np.squeeze(x)),1),meta = np.array((), dtype='float'))
%timeit -r 10 foo3 = Xm.compute()

### map over columns by slicing
Xm = client.map(lambda i: ecdf(np.asarray(X[:,i])),range(X.shape[1]))
Xm = client.submit(lambda x: da.transpose(da.vstack(x)),Xm)
%timeit -r 10 foo4 = Xm.result()

### apply_along_axis
Xaa = da.apply_along_axis(lambda x: np.expand_dims(ecdf(x),1), 0, X, dtype=X.dtype, shape=X.shape)
%timeit -r 10 foo5 = Xaa.compute()

### lazy loop
Xl = []

for i in range(X.shape[1]):
    Xl.append(dask.delayed(ecdf)(X[:,i]))
    
Xl = dask.delayed(da.vstack)(Xl)
%timeit -r 10 foo6 = Xl.compute()

Along my benchmarks "map over columns by slicing" is the fastest approach followed by "adjusting chunk size to column size & map_blocks" and the non-parallel "apply_along_axis".

Method Results (10 loops)
traditional for loop 2.16 s ± 82.3 ms
adjusting chunk size to 2d-array & map_blocks 1.26 s ± 301 ms
adjusting chunk size to column size & map_blocks 926 ms ± 31.9
map over columns by slicing 316 ms ± 11.5 ms
apply_along_axis 1.01 s ± 18.7 ms
lazy loop 1.4 s ± 352 ms

Along my understanding of the idea behind Dask, I would have expected the "adjusting chunk size to 2d-array & map_blocks" method to be the fastest. The two approaches which performed best don't seem to be very "Dasky" at the same time the non-parallel apply_along_axis is ranked third. All that gives my the suspicion that I am doing something wrong.


Solution

  • As far as I can tell, your code looks correct (see the explanation below for why the performance of map over columns by slicing is misleadingly fast). With some minor refactoring, the "dask-y" version might be:

    from dask.array.random import random
    from numpy import zeros
    from statsmodels.distributions.empirical_distribution import ECDF
    
    n_rows = 100_000
    X = random((n_rows, 100), chunks=(n_rows, 1))
    
    _ECDF = lambda x: ECDF(x.squeeze())(x)
    meta = zeros((n_rows, 1), dtype="float")
    foo0 = X.map_blocks(_ECDF, meta=meta)
    
    # executing foo0.compute() should take about 0.8s
    

    Note that the dask array is initialized with the appropriate chunking (one column per chunk), while in your current code the execution timing will include the time to rechunk the array.

    In terms of overall speed-up, the individual computations are tiny (on the scale of 50ms), so to reduce the number of tasks it's possible to chunk several processing of several columns into a single chunk. However, this has a trade-off associated with the slow-down due to iterating over the columns of the numpy array. The main advantage is in the reduced burden on the scheduler. Depending on the scale of your final dataset and the computing resources available, the chunked version might have a slight advantage over the non-chunked version (i.e. the first snippet):

    from dask.array.random import random
    from numpy import stack, zeros
    from statsmodels.distributions.empirical_distribution import ECDF
    
    n_rows = 100_000
    n_cols = 100
    chunk_size = (n_rows, 10)
    
    X = random((n_rows, n_cols), chunks=chunk_size)
    
    _ECDF = lambda x: ECDF(x.squeeze())(x)
    
    
    def block_ECDF(x):
        return stack([_ECDF(column) for column in x.T], axis=1)
    
    
    meta = zeros(chunk_size, dtype="float")
    foo0 = X.map_blocks(block_ECDF, meta=meta)
    # executing foo0.compute() should take about 0.8s
    

    Note that the fastest performer in your benchmarks is map over columns by slicing. However, this is misleading because what python is timing here is just the collection of the computed results. Most of the time will be spent on the computation, so the accurate way to time this approach is to start the timer when the futures are submitted and end it when the results are collected.