pythonpandasdataframedaskmemory-profiling

Dask: Running out of memory during filtering (MRE)


tl;dr

I want to filter a Dask dataframe based on a value of a column, i.e.

data.loc[data[column].lt(value)].to_parquet(path)

but I run out of memory doing so, despite each partition being 20-times smaller than the available memory.

Sample data

Let's first create some sample data to work with

import numpy as np
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])
ddf = dd.from_pandas(df, npartitions=800)  # each partition has about 10Mb in memory
ddf.to_parquet('sample.parq')

Solution attempt

Let's assume that my machine only has 512Mb of memory. Of course, my problem is of a much bigger scale (terabytes in my case), but it seems that this simple problem captures the same problem I have with a larger dataset.

I will thus use two workers with 200Mb each

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=2, memory_limit='200Mb')
client = Client(cluster)

data = dd.read_parquet('sample.parq')
task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)

Since each partition takes 10Mb in memory, according to Wes Kinney's rule of thumb, I might need at most 50-100Mb of memory to process a partition, so 200Mb should be more than enough.

However, when I run task.compute() workers almost immediately run out of memory, get restart and later killed altogether.

Things I tried

Limiting resources

I have also tried to limit worker resources. As far as I understand, this should let the worker know that it can only process one task at a time. Perhaps this is too conservative, but in that case, I'd expect a deadlock to happen, not to run out of memory.

cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})
task.compute(resources={'m': 200})

However, the result is sadly the same

Profiling memory usage of reading parquet

SultanOrazbayev suggested that I should use memory_profiler to see how much memory is used during loading a single partition, since the usage of read_parquet is likely the culprit here.

I wrote test-load.py

import pandas as pd

@profile
def load_parq():
    return pd.read_parquet('sample.parq/part.0.parquet')

if __name__ == '__main__':
    df = load_parq()
    print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')

And ran it using python3 -m memory_profiler test-load.py. This is the output:

Memory footprint: 10.0MB
Filename: test-load.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     3   74.750 MiB   74.750 MiB           1   @profile
     4                                         def load_parq():
     5  132.992 MiB   58.242 MiB           1       return pd.read_parquet('sample.parq/part.0.parquet')

And — fair enough — even reading a single file requires even more memory that I thought. 200MB perhaps might not be enough, but how much is?

The answer, on my setup, turns out to be about 4GB for each of the two workers. That's actually equal to the dataset as a whole. And in fact, looking at the dashboard, dask seems to happily be loading dozens of partitions at once. That's fine if it has the 4GB of memory, but how can I proceed if it doesn't have that much?


Solution

  • I think I have figured the culprit in this case. The problem is that dask automatically attempted to use as many cores as were available.

    from dask.distributed import Client
    with Client(n_workers=2, memory_limit='300Mb') as client:
        print(client)
    

    produces

    <Client: 'tcp://127.0.0.1:39337' processes=2 threads=48, memory=600.00 MB>
    

    When I thus attempt to read a parquet file, dask uses all 48 available cores, instantly running out of memory.

    The trick here is to limit the number of threads per worker:

    with Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) as client:
        print(client)
    

    which yields

    <Client: 'tcp://127.0.0.1:34421' processes=2 threads=2, memory=600.00 MB>
    

    and the computation then proceeds without any problems, using about 200-250MB per worker at any point in time.

    Related question