pythonmultithreadingconcurrent.futures

ThreadPoolExecutor and time-outs for individual threads


I have the following code that yields a TimeoutError for the third result.

import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError


def sleeping(i):
    time.sleep(i)

    return f'slept {i} seconds'


with ThreadPoolExecutor(max_workers=1) as exe:
    try:
        for result in exe.map(sleeping, [1, 2, 10, 3], timeout=5):
            print(result)
    except TimeoutError:
        print('Timeout waiting for map()')

The problem is that the current code checks for each result whether a timeout occurs and then the whole thread pool is closed. I would like to complete all the tasks. Desired output:

slept 1 seconds
slept 2 seconds
Time- out waiting for map()
slept 3 seconds

Please advice


Solution

  • Unfortunately, not a trivial thing to do -

    The timeout that is raised by a call to executor.map will always cancel all remaining tasks. The executor itself is not shut down but all subsequent tasks in the map call are cancelled. Anyway, it would be troublesome for concurrent.futures to "recover" a worker that timed out (as in: it can't really reuse a worker if it is busy running a previous task)

    Anyway, if you need to retry the timeouts or something, the solution here is to manage the timeout yourself, in the function inside worker thread: an exception raised by your code in there won't mess up the executor, and can be handled individually on the controler thread by using executor.submit and handling the returned Future objects. instead of executor.map.

    There is however one extra problem: that it is not really easy in Python to cause a timeout error inside the same thread. (i.e. the worker thread would have to "timeout" itself).

    (0) If you just want to keep with the tasks, and don't mind that the timed-out task just consumes a worker thread until it is finished, it is just a matter of using the .submit executor method and handling futures, instead of the .map call. I present some code for this case bellow.

    (1) If your code that is execute in the threads can be made to run asynchronously, using asyncio, then, there is asyncio.Timeout. But if your workers can be rewritten to be async, then maybe your whole project might be rewritten as asynchronous code, instead of multithreading to start with.

    Or (2) you could instrument your code so that it will measure time and stop the processing itself - and raise TimeoutError instead of continuing - this is the most simple, most reliable, but not always possible. Unfortunatelly a "timeout decorator" can't be written without itself using other threads.

    (3) Another layer of thread-workers would have to be created inside each worker of the main threadpoll executor - so that the first layer could manage the second layer timeouts and re-start the inner workers when needed. If the second layer can be made of sub-processes instead of threads, it may be possible to kill then to free up resources overtime, if timing out threads do get stuck.

    (4) Just like (0), but create a bigger executor, with spare worker threads, and craft a high level substitute to the "map" call that would start using more threads as the first ones get stuck in their processing. This one could use internally the submit and wait methods of concurrent.futures .

    So, with the example code, where the workers just call "time.sleep", for example, one can't really opt for the proposal #2 above: a function would take note of the start time, and inside a loop where it does its calculations, periodically compare the current time to the start time, and if timeout has expired, raise timeout error. (A special "sleep" call could be written to do so, but it would be pointless).

    If your workers are dealing with I/O, and the reason they fail is that sometimes server where one is fetching data times out itself, then it might really be easier to rewrite your code to use asyncio (proposal #1).

    the option #3 above is complicated to implement, but feasible. If the second layer can use sub-processes, it may be the only way to be able to free up stuck resources if timed-out keep running forever. (if instead, they just take longer than they'd be usefull, but eventually end, that can be simpler)

    Anyway, here is the code for the option #0 above - if needed it can be evolved to accomplish what I suggest in #4 or #3 for your production:

    import time
    from concurrent.futures import ThreadPoolExecutor, TimeoutError, wait
    
    
    def sleeping(i):
        time.sleep(i)
    
        return f'slept {i} seconds'
    
    
    
    
    data = [1, 2, 9, 3]
    TIMEOUT = 5
    
    with ThreadPoolExecutor(max_workers=1) as exe:
    
        futures = {exe.submit(sleeping, i) for i in data}
        while futures and not all(getattr(future, "soft_cancel", False) == True for future in futures):
            done, pending = wait(futures, timeout=TIMEOUT)
            for future in done:
                print(future.result())
            futures = set()
            for future in pending:
                if started_at:=getattr(future, "started_at", None):
                    if (measured:=time.monotonic() - started_at) >= TIMEOUT:
                        future.soft_cancel = True
                        print(f"{future} canceled due to timeout: {measured}")
                        future = None
                elif future.running():
                    future.started_at = time.monotonic()
                if future is not None:
                    print("waiting for another cycle: ", future)
                    futures.add(future)
    
    

    Note that the code attaches "extraneous" attributes to the futures, without changing the classes: well, Python allows us to do that, and it is needed here, otherwise we'd need an external way to map each running future to the meta-data (soft_cancel, started_at) we need here. It is an "abuse" of OOP, but not an abuse of a dynamic language.

    The asyncio library, unlike concurrent.futures allow one to subclass the Futures, and these attributes could be added in a more elegant way - but these are equally functional.