pythonpython-3.xpandasdaskdask-dataframe

`ValueError: cannot reindex from a duplicate axis` using Dask DataFrame


I've been trying to adapt my code to utilize Dask to utilize multiple machines for processing. While the initial data load is not time-consuming, the subsequent processing takes roughly 12 hours on an 8-core i5. This isn't ideal and figured that using Dask to help spread the processing across machines would be beneficial. The following code works fine with the standard Pandas approach:

import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype("str")

artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip()
)

Converting to Dask seemed straightforward, but I'm hitting hiccups along the way. The following Dask-adapted code throws a ValueError: cannot reindex from a duplicate axis error:

import dask.dataframe as dd
from dask.distributed import Client

artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip().compute()
)

if __name__ == '__main__':
    client = Client()

The best I can discern is that Dask won't allow reassignment to an existing Dask DataFrame. So this works:

...
artists_new = artists["name"].astype("str").compute()
...

However, I really don't want to create a new DataFrame each time. I'd rather replace the existing DataFrame with a new one, mainly because I have multiple data cleaning steps before processing.

While the tutorial and guides are useful, they are pretty basic and don't cover such use cases.

What are the preferred approaches here with Dask DataFrames?


Solution

  • Every time you call .compute() on Dask dataframe/series, it converts it into pandas. So what is happening in this line

    artists["name"] = artists["name"].astype(str).compute()

    is that you are computing the string column and then assigning pandas series to a dask series (without ensuring alignment of partitions). The solution is to call .compute() only on the final result, while intermediate steps can use regular pandas syntax:

    # modified example (.compute is removed)
    artists["name"] = artists["name"].astype(str).str.lower()