I am running tasks using client.submit
thus:
from dask.distributed import Client, get_client, wait, as_completed
# other imports
zip_and_upload_futures = [ client.submit(zip_and_upload, id, path, file_list) for id, file_list in enumerate(file_lists) ]
upload_futures = []
failed_list = []
for future in as_completed(zip_and_upload_futures):
result = future.result()
upload_futures.append(result[0])
print(f'Zip {result[1]} creating and added to upload queue.')
del future
for future in as_completed(upload_futures):
if 'finished' in future.status:
print('Zip {future.result()[0]} uploaded.')
del f
elif 'exception' in future.status or 'error' in future.status:
failed_list.append(future.exception(),future.traceback())
del f
In zip_and_upload(id, path, file_list)
, files are zipped to in-memory file-like (BytesIO) objects, as I'm limited on local disk, then the zip objects are uploaded.
def zip_and_upload(id, path, file_list):
client = get_client()
zip_future = client.submit(zip_file_list, id, file_list, compression)
wait(zip_future)
results = zip_future.result() # tuple == (zip_data, name_list)
future = client.submit(upload, id, results[0], results[1])
return future, f'{path}_{id}.zip'
I get a lot of errors from the scheduler along the lines of distributed.scheduler - ERROR - Couldn't gather keys: {'zip_file_list-<hash>': 'queued'}
Questions:
Many thanks.
As described in the documentation, launching tasks from task
can deadlock the scheduler if too many tasks request jobs at once. Each task does not communicate to the scheduler that they are waiting on results and are free to compute other tasks. This can deadlock the cluster if every scheduling slot is running a task and they all request more tasks.
I think this is what you are encountering. I don't thing you really need to submit theses tasks, especially because you are waiting on the first one before submitting the other one, so maybe just remove the client.submit
inside your nested function.
If you really need this kind of workflow, then please read the page already mentionned:
To avoid this deadlocking issue we can use secede and rejoin. These functions will remove and rejoin the current task from the cluster respectively.