pythonparallel-processingdaskdask-distributeddask-dataframe

Dask DataFrame of strings works too slow on row-wise apply


I have a Dask dataframe with no missing values. I am trying to apply a function to all but first two columns to do the following:

Sample input:

    R  A   T   U   V
0   R  A  00  10  11
1   R  A  00  10  11
2   R  A  00  10  11
3   R  A  00  10  11
4   R  A  00  10  11
.. .. ..  ..  ..  ..
95  R  A  11  00  00
96  R  A  11  00  00
97  R  A  11  00  00
98  R  A  11  00  00
99  R  A  11  00  00

The possible number of strings is very small in col_i (<50), and a simplified version is used below.

Output:

     T   U   V
0   rr  ar  aa
1   rr  ar  aa
2   rr  ar  aa
3   rr  ar  aa
4   rr  ar  aa
..  ..  ..  ..
95  aa  rr  rr
96  aa  rr  rr
97  aa  rr  rr
98  aa  rr  rr
99  aa  rr  rr

Current code:

from dask.distributed import Client, progress
client = Client(n_workers=20, threads_per_worker=1)
client
    
import pandas as pd
import dask.dataframe as dd
    
def score(x):
    return str(x[0] + x[1]).lower()
    
def func(row, i, fs):
    r = row[0]
    a = row[1]
    row[i] = fs(row[i].replace('0',r).replace('1',a))
    return row
   
s1 = ['00']*25 + ['01']*25 + ['10']*25 + ['11']*25
s2 = ['10']*25 + ['11']*25 + ['01']*25 + ['00']*25
    
df = pd.DataFrame({'R':['R']*100, 'A':['A']*100, 'T':s1, 'U':s2, 'V': reversed(s1)})
ddf = dd.from_pandas(df, npartitions=10)
    
meta = dict()
for cn in ddf.columns:
    meta[cn] = 'object'
    
ddf.compute()
for i in range(2,len(ddf.columns)):
    ddf = ddf.apply(func, args=(i,score,), axis=1, meta=meta)
    
ddf = ddf.drop(['R','A'], axis=1)
ddf.compute()

Tried using numba, etc but did not have success. I would be very grateful for any improvements to the above code.


Solution

  • This specific example can be vectorized/turned into simpler statements for greater efficiency. However, if the actual code is more complex and the separation of score and func is necessary, then the main bottleneck is in these lines:

    for i in range(2,len(ddf.columns)):
        ddf = ddf.apply(func, args=(i,score,), axis=1, meta=meta)
    

    The code above is a bottleneck because operations that could be done in parallel are being done sequentially. Column corresponding to i=3 cannot be processed until column corresponding to i=2 has been processed.

    A simple way to remove this bottleneck is to put the looping over columns inside the func:

    from dask.dataframe import from_dict
    from pandas import DataFrame, Series
    
    
    def score(x):
        return str(x[0] + x[1]).lower()
    
    
    def func(row, fs):
        r = row[0]
        a = row[1]
        new_row = []
        for cell in row[2:]:
            _value = fs(cell.replace("0", r).replace("1", a))
            new_row.append(_value)
        return new_row
    
    
    s1 = ["00"] * 25 + ["01"] * 25 + ["10"] * 25 + ["11"] * 25
    s2 = ["10"] * 25 + ["11"] * 25 + ["01"] * 25 + ["00"] * 25
    data = {"R": ["R"] * 100, "A": ["A"] * 100, "T": s1, "U": s2, "V": reversed(s1)}
    
    ddf = from_dict(data, npartitions=10)
    
    meta = DataFrame(columns=[0, 1, 2]).astype(str)
    ddf = ddf.apply(func, args=(score,), axis=1).apply(Series, meta=meta)
    ddf.compute()