pythondaskdask-distributeddask-kubernetes

Why do my Dask Futures get stuck in 'pending' and never finish?


I have some long-running code (~5-10 minute processing) that I'm trying to run as a Dask Future. It's a series of several discrete steps that I can either run as one function:

result : Future = client.submit(my_function, arg1, arg2)

Or I can split up into intermediate steps:

# compose the result from the same intermediate results but with Futures
intermediate1 = client.submit(my_function1, arg1)
intermediate2 = client.submit(my_function2, arg1, arg2)
intermediate3 = client.submit(my_function3, intermediate2, arg1)
result = client.submit(my_function4, intermediate3)

If I run this locally (eg, result = my_function(arg1, arg2)), it completes. If I submit it to Dask, I immediately get my Future back - as expected - but the job never completes. Further, if I grab the result.key as a way to track the status of the job, later reconstructing the future as result = Future(key), it always has a state of pending.

I want to first get it running as-is so that I can have my processing offloaded to my Dask workers instead of an API that's handling the requests, and then I want to be able to start splitting up work across nodes so I can improve the performance. But why are my jobs just evaporating? Looking at my Dask scheduler web interface, it doesn't appear the jobs are even showing up. But I know Dask is working because I can submit code to it from my Jupyter notebook.

I'm calling client.submit from a Flask server, and I'm returning the key so it can be used later. Roughly:

@app.route('/submit')
def submit():
    # ...
    future = client.submit(my_function, arg1, arg2)
    return jsonify({"key": future.key})

@app.route('/status/<key>')
def status(key):
    future = Future(key)
    return jsonify({"status": future.status})

When my application is deployed to Kubernetes, my /submit route gets a Future key back, but my Dask status page doesn't show any processing task. If I run Flask locally, I do see a task show up, and the output of my job does show up after an expected delay; however, when I hit my own /status/<key> path with the Future key returned from /submit, it always shows the state is pending.


Solution

  • If all futures pointing to a task disappear then Dask feels free to forget about that task. This allows Dask to clean up work, rather than have all intermediate results stay around forever.

    If you want to hold on to references then you'll need to hold on to futures. This tells Dask that you still care about the result. You can do this locally in your flask app by creating a dictionary.

    futures = {}
    
    @app.route('/submit')
    def submit():
        # ...
        future = client.submit(my_function, arg1, arg2)
        futures[future.key] = future
        return jsonify({"key": future.key})
    
    @app.route('/status/<key>')
    def status(key):
        future = futures[key]
        return jsonify({"status": future.status})
    

    But you'll also want to think about when you can clean up and release those futures. With this approach you will slowly fill up your memory.