pythondaskjob-queue

Dask job queue design pattern?


Let's say I have a simple costly function that stores some results to a file:

def costly_function(filename):
    time.sleep(10)
    with open('filename', 'w') as f:
        f.write("I am done!)

Now let's say I would like to schedule a number of these tasks in dask, which then takes these requests asynchronously and runs these functions one by one. I'm currently setting up a dask client object...

cluster = dask.distributed.LocalCluster(n_workers=1, processes=False)  # my attempt at sequential job processing
client = dask.distributed.Client(cluster)

... and then interactively (from IPython) scheduling these jobs:

>>> client.schedule(costly_function, "result1.txt")
>>> client.schedule(costly_function, "result2.txt")
>>> client.schedule(costly_function, "result3.txt")

The issue that I'm getting is that these tasks are not running consecutively but in parralel, which in my particular case is causing concurrency issues.

So my question is: What is the correct way to set up a job queue like the one I described above in dask?


Solution

  • Ok, I think I might have a solution (feel free to come up with better ones though!). It requires modifying the previous costly function slightly:

    def costly_function(filename, prev_job=None):
        time.sleep(10)
        with open('filename', 'w') as f:
            f.write("I am done!")
    
    cluster = dask.distributed.LocalCluster(n_workers=1, processes=False)  # my attempt at sequential job processing
    client = dask.distributed.Client(cluster)
    

    And then in interactive context you would write the following:

    >>> future = client.submit(costly_function, "result1.txt")
    >>> future = client.submit(costly_function, "result2.txt", prev_job=future)
    >>> future = client.submit(costly_function, "result3.txt", prev_job=future)