I know similar questions have been asked before but their solutions were not very helpful. I guess the best solution is probably more specific to each cluster configuration, so I'm putting more details here about my cluster and my error.
import dask.dataframe as dd
import dask.bag as db
import json
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
Here are my cluster settings
cluster.scheduler
#output:
Scheduler: tcp://127.0.0.1:35367 workers: 8 cores: 48 tasks: 0
cluster.workers
#output:
{0: <Nanny: tcp://127.0.0.1:43789, threads: 6>,
1: <Nanny: tcp://127.0.0.1:41375, threads: 6>,
2: <Nanny: tcp://127.0.0.1:42577, threads: 6>,
3: <Nanny: tcp://127.0.0.1:40171, threads: 6>,
4: <Nanny: tcp://127.0.0.1:32867, threads: 6>,
5: <Nanny: tcp://127.0.0.1:46529, threads: 6>,
6: <Nanny: tcp://127.0.0.1:41535, threads: 6>,
7: <Nanny: tcp://127.0.0.1:39645, threads: 6>}
client
#output
Client
Scheduler: tcp://127.0.0.1:35367
Dashboard: http://127.0.0.1:8787/status
Cluster
Workers: 8
Cores: 48
Memory: 251.64 GiB
Here are my data loading codes:
b = db.read_text('2019-12-16-latest-level.json').map(json.loads)
def flatten(record):
return {
'uuid': record['uuid'],
'stored_at': record['stored_at'],
'duration': record['duration']
}
All codes above run fine. Here is the one that got in trouble:
df = b.map(flatten).to_dataframe()
df.head()
The codes run for around 1 day and gave the warning below:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
Then around another day, the program stopped and gave me the error below:
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
<ipython-input-10-84f98622da69> in <module>
1 df = b.map(flatten).to_dataframe()
----> 2 df.head()
And here are the last few lines of the error report:
KilledWorker: ("('bag-from-delayed-file_to_blocks-list-loads-flatten-0daa9cba16c635566df6215c209f653c', 0)", <WorkerState 'tcp://127.0.0.1:41535', name: 6, memory: 0, processing: 1>)
Screenshots of the full error report is also attached:
Any suggestions on how to deal with this issue? Thanks.
I have been using dask, with mixed results, for about a month now. My personal belief is that the software has some kind of deadly embrace in its memory management when executing task graphs. A typical modus operandi of dask is to compute 95% of a large computation in just a few minutes, then spend the next 8 hours crunching away at the last 5% appearing to do nothing before it crashes or I run out of compute budget. This is very frustrating.
That said, I have had some limited success using fewer workers or confining workers to processes rather than threads. So, on a 16 core machine I might do:
client = Client(processes=True, threads_per_worker=1)
Another important thing to do is persist intelligently. Persisting causes there to be fewer tasks in the graph (and as a result in memory) at a given time. So if I want to read a bag from a json file, I would persist the bag before converting to a dataframe, otherwise the reading and conversion both happen at the compute() step and I find that is a recipe for failure.
However, as I said, I have found dask quite disappointing, considering all it appears on the surface to be capable of. I'm switching to vaex instead.
Sorry I was unable to help more.