I would like to use dask to parallelize the data processing for dask cudf from Jupyter notebook on multiple GPUs.
import cudf from dask.distributed
import Client, wait, get_worker, get_client
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
import pandas as pd
import random
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1", n_workers=2, threads_per_worker=4, memory_limit="12GB", device_memory_limit="20GB", rmm_pool_size="2GB", rmm_maximum_pool_size="10GB")
client = Client(cluster)
client.run(cudf.set_allocator, "managed")
worker_info = client.scheduler_info()["workers"]
for worker, info in worker_info.items():
print(worker)
# tcp://127.x.x.x:xxxxxx
# tcp://127.x.x.x:xxxxxx
df = pd.DataFrame({'col_1':random.sample(range(10**3), 10**3), 'col_2': random.sample(range(10**3), 10**3) })
ddf = dd.from_pandas(df, npartitions=8)
def test_f(df, col_1, col_2):
return df.assign(result=df[col_1]*df[col_2])
ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2', meta={'col_1':'int64', 'col_2':'int64', 'result':'int64'})
The code worked well. But, the GPUs utilization is 0. I expect that the 8 partitions can be distributed to the 2 GPUs, which will work on them in parallel so that the processing can be sped up.
Could anybody let me know if I missed anything ?
You are creating a dask dataframe from pandas input, resulting in partitions which are also pandas, not cuDF. You need the following
import dask_cudf
import cudf
gdf = cudf.DataFrame.from_pandas(df)
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
or
dask_df = ddf.map_partitions(cudf.from_pandas)
to get a partitioned dataframe on the GPU. The former allocates on the GPU immediately, the latter at compute time.
from_pandas
is very likely not what you actually want. You should instead load your data directly from CSV/parquet into the GPU (using dask).