gpudaskdask-distributedrapidscudf

How to parallel GPU processing of Dask dataframe


I would like to use dask to parallelize the data processing for dask cudf from Jupyter notebook on multiple GPUs.

import cudf from dask.distributed 
import Client, wait, get_worker, get_client 
from dask_cuda import LocalCUDACluster 
import dask.dataframe as dd 
import pandas as pd 
import random 

cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1", n_workers=2, threads_per_worker=4, memory_limit="12GB",  device_memory_limit="20GB", rmm_pool_size="2GB", rmm_maximum_pool_size="10GB") 

client = Client(cluster) 

client.run(cudf.set_allocator, "managed") 

worker_info = client.scheduler_info()["workers"] 

for worker, info in worker_info.items(): 
    print(worker) 

# tcp://127.x.x.x:xxxxxx 
# tcp://127.x.x.x:xxxxxx 

df = pd.DataFrame({'col_1':random.sample(range(10**3), 10**3), 'col_2': random.sample(range(10**3), 10**3) }) 

ddf = dd.from_pandas(df, npartitions=8) 

def test_f(df, col_1, col_2):
     return df.assign(result=df[col_1]*df[col_2])

ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2',  meta={'col_1':'int64', 'col_2':'int64', 'result':'int64'}) 

The code worked well. But, the GPUs utilization is 0. I expect that the 8 partitions can be distributed to the 2 GPUs, which will work on them in parallel so that the processing can be sped up.

Could anybody let me know if I missed anything ?


Solution

  • You are creating a dask dataframe from pandas input, resulting in partitions which are also pandas, not cuDF. You need the following

    import dask_cudf
    import cudf
    
    gdf = cudf.DataFrame.from_pandas(df)
    dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
    

    or

    dask_df = ddf.map_partitions(cudf.from_pandas)
    

    to get a partitioned dataframe on the GPU. The former allocates on the GPU immediately, the latter at compute time.

    from_pandas is very likely not what you actually want. You should instead load your data directly from CSV/parquet into the GPU (using dask).