pythonconcurrent.futures

Retrying failed futures in Python's ThreadPoolExecutor


I want to implement retry logic with Python's concurrent.futures.ThreadPoolExecutor. I would like the following properties:

  1. A new future is added to the work queue as soon as it fails.
  2. A retried future can be retried again, either indefinitely or up to a maximum retry count.

A lot of existing code I found online basically operates in "rounds", where they call as_completed on an initial list of futures, resubmits failed futures, gathers those futures in a new list, and goes back to calling as_completed on the new list if it's not empty. Basically something like this:

with concurrent.futures.ThreadPoolExecutor(...) as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        for fut in concurrent.futures.as_completed(futures):
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        futures = new_futures

However, I think that doesn't satisfy the first property, since it's possible that a retried future completes before the initial futures, but we won't process it until all the initial futures complete.

Here's a hypothetical pathological case. Let's say we have two jobs, the first runs for 1 second but has a 90% chance of failure, while the second runs for 100 seconds. If our executor has 2 workers, and the first job fails after 1 second, we'll retry it immediately. But if it failed again, we won't be able to retry until the second job completes.


So my question is, is it possible to implement retry logic with these desired properties, without using external libraries or rewriting low-level executor logic? One thing I tried is putting the retry logic in the code sent to the worker:

def worker_job(fn):
    try:
        return fn()
    except Exception:
        executor.submit(fn)

with concurrent.futures.ThreadPoolExecutor(...) as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    executor.map(worker_job, jobs)

But it seems like submitting new jobs from within a job doesn't work.


Solution

  • Retry using as_completed

    Simple way

    Loop with wait(..., return_when=FIRST_COMPLETED) instead of as_completed(...).

    Trade-offs:

    1. Overhead of pending futures (re-adding waiter, building new_futures).
    2. Troublesome if want to specify overall timeout.
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {executor.submit(fn, job): job for job in jobs}
        while len(futures) > 0:
            new_futures = {}
            done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
            for fut in done:
                if fut.exception():
                    job = futures[fut]
                    new_futures[executor.submit(fn, job)] = job
                else:
                    ...  # logic to handle successful job
            for fut in pending:
                job = futures[fut]
                new_futures[fut] = job
            futures = new_futures
    

    Efficient way

    Tweak as_completed(...) to add to fs and pending, and use waiter.

    Trade-off: Maintenance.

    Advantage: Ability to specify overall timeout if wanted.

    class AsCompletedWaiterWrapper:
        def __init__(self):
            self.fs = None
            self.pending = None
            self.waiter = None
    
        def listen(self, fut):
            with self.waiter.lock:
                self.fs.add(fut)
                self.pending.add(fut)
                fut._waiters.append(self.waiter)
    
        def as_completed(self, fs, timeout=None):
            """
            concurrent.futures.as_completed plus the 3 lines marked with +.
            """
            if timeout is not None:
                end_time = timeout + time.monotonic()
    
            fs = set(fs)
            total_futures = len(fs)
            with _AcquireFutures(fs):
                finished = set(
                        f for f in fs
                        if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
                pending = fs - finished
                waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
            self.fs = fs            # +
            self.pending = pending  # +
            self.waiter = waiter    # +
            finished = list(finished)
            try:
                yield from _yield_finished_futures(finished, waiter,
                                                   ref_collect=(fs,))
    
                while pending:
                    if timeout is None:
                        wait_timeout = None
                    else:
                        wait_timeout = end_time - time.monotonic()
                        if wait_timeout < 0:
                            raise TimeoutError(
                                    '%d (of %d) futures unfinished' % (
                                    len(pending), total_futures))
    
                    waiter.event.wait(wait_timeout)
    
                    with waiter.lock:
                        finished = waiter.finished_futures
                        waiter.finished_futures = []
                        waiter.event.clear()
    
                    # reverse to keep finishing order
                    finished.reverse()
                    yield from _yield_finished_futures(finished, waiter,
                                                       ref_collect=(fs, pending))
    
            finally:
                # Remove waiter from unfinished futures
                for f in fs:
                    with f._condition:
                        f._waiters.remove(waiter)
    

    Usage:

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {executor.submit(fn, job): job for job in jobs}
        w = AsCompletedWaiterWrapper()
        for fut in w.as_completed(futures):
            if fut.exception():
                job = futures[fut]
                new_fut = executor.submit(fn, job)
                futures[new_fut] = job
                w.listen(new_fut)
            else:
                ...  # logic to handle successful job
    

    Retry from job helper

    Wait for events in with ... executor: as ThreadPoolExecutor.__exit__ shuts down executor so it cannot schedule new futures.

    Trade-offs:

    1. Would not work with ProcessPoolExecutor due to executor reference in main process.
    2. Troublesome if want to specify overall timeout.
    def worker_job(fn, event):
        try:
            rv = fn()
            event.set()
            return rv
        except Exception:
            executor.submit(worker_job, fn, event)
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        jobs = [functools.partial(fn, arg) for arg in args]
        events = [threading.Event() for _ in range(len(jobs))]
        executor.map(worker_job, jobs, events)
        for e in events:
            e.wait()