I have a Dask dataframe with no missing values. I am trying to apply a function to all but first two columns to do the following:
Sample input:
R A T U V
0 R A 00 10 11
1 R A 00 10 11
2 R A 00 10 11
3 R A 00 10 11
4 R A 00 10 11
.. .. .. .. .. ..
95 R A 11 00 00
96 R A 11 00 00
97 R A 11 00 00
98 R A 11 00 00
99 R A 11 00 00
The possible number of strings is very small in col_i (<50), and a simplified version is used below.
Output:
T U V
0 rr ar aa
1 rr ar aa
2 rr ar aa
3 rr ar aa
4 rr ar aa
.. .. .. ..
95 aa rr rr
96 aa rr rr
97 aa rr rr
98 aa rr rr
99 aa rr rr
Current code:
from dask.distributed import Client, progress
client = Client(n_workers=20, threads_per_worker=1)
client
import pandas as pd
import dask.dataframe as dd
def score(x):
return str(x[0] + x[1]).lower()
def func(row, i, fs):
r = row[0]
a = row[1]
row[i] = fs(row[i].replace('0',r).replace('1',a))
return row
s1 = ['00']*25 + ['01']*25 + ['10']*25 + ['11']*25
s2 = ['10']*25 + ['11']*25 + ['01']*25 + ['00']*25
df = pd.DataFrame({'R':['R']*100, 'A':['A']*100, 'T':s1, 'U':s2, 'V': reversed(s1)})
ddf = dd.from_pandas(df, npartitions=10)
meta = dict()
for cn in ddf.columns:
meta[cn] = 'object'
ddf.compute()
for i in range(2,len(ddf.columns)):
ddf = ddf.apply(func, args=(i,score,), axis=1, meta=meta)
ddf = ddf.drop(['R','A'], axis=1)
ddf.compute()
Tried using numba
, etc but did not have success. I would be very grateful for any improvements to the above code.
This specific example can be vectorized/turned into simpler statements for greater efficiency. However, if the actual code is more complex and the separation of score
and func
is necessary, then the main bottleneck is in these lines:
for i in range(2,len(ddf.columns)):
ddf = ddf.apply(func, args=(i,score,), axis=1, meta=meta)
The code above is a bottleneck because operations that could be done in parallel are being done sequentially. Column corresponding to i=3
cannot be processed until column corresponding to i=2
has been processed.
A simple way to remove this bottleneck is to put the looping over columns inside the func
:
from dask.dataframe import from_dict
from pandas import DataFrame, Series
def score(x):
return str(x[0] + x[1]).lower()
def func(row, fs):
r = row[0]
a = row[1]
new_row = []
for cell in row[2:]:
_value = fs(cell.replace("0", r).replace("1", a))
new_row.append(_value)
return new_row
s1 = ["00"] * 25 + ["01"] * 25 + ["10"] * 25 + ["11"] * 25
s2 = ["10"] * 25 + ["11"] * 25 + ["01"] * 25 + ["00"] * 25
data = {"R": ["R"] * 100, "A": ["A"] * 100, "T": s1, "U": s2, "V": reversed(s1)}
ddf = from_dict(data, npartitions=10)
meta = DataFrame(columns=[0, 1, 2]).astype(str)
ddf = ddf.apply(func, args=(score,), axis=1).apply(Series, meta=meta)
ddf.compute()