pythonpython-asyncio

python asyncio - cancelling a `to_thread` task won't stop the thread


With the following snippet, I can't figure why the infiniteTask is not cancelled (it keeps spamming "I'm still standing")

In debug mode, I can see that the Task stored in unfinished is indeed marked as Cancelled but obiously the thread is not cancelled / killed.

Why is the thread not killed when the wrapping task is cancelled ? What should I do to stop the thread ?

import time
import asyncio

def quickTask():
    time.sleep(1)

def infiniteTask():
    while True:
        time.sleep(1)
        print("I'm still standing")

async def main():
    finished, unfinished = await asyncio.wait({
            asyncio.create_task(asyncio.to_thread(quickTask)),
            asyncio.create_task(asyncio.to_thread(infiniteTask))
        },
        return_when = "FIRST_COMPLETED"
    )

    for task in unfinished:
        task.cancel()
    await asyncio.wait(unfinished)

    print("  finished : " + str(len(finished))) # print '1'
    print("unfinished : " + str(len(unfinished))) # print '1' 

    
asyncio.run(main())

Solution

  • Cause

    If we check the definition of asyncio.to_thread():

    # python310/Lib/asyncio/threads.py
    # ...
    
    async def to_thread(func, /, *args, **kwargs):
        """Asynchronously run function *func* in a separate thread.
    
        Any *args and **kwargs supplied for this function are directly passed
        to *func*. Also, the current :class:`contextvars.Context` is propagated,
        allowing context variables from the main thread to be accessed in the
        separate thread.
    
        Return a coroutine that can be awaited to get the eventual result of *func*.
        """
        loop = events.get_running_loop()
        ctx = contextvars.copy_context()
        func_call = functools.partial(ctx.run, func, *args, **kwargs)
        return await loop.run_in_executor(None, func_call)
    

    It's actually a wrapper of loop.run_in_executor.

    If we then go into how asyncio's test handle run_in_executor:

    # python310/Lib/test/test_asyncio/threads.py
    # ...
    
    class EventLoopTestsMixin:
        # ...
    
        def test_run_in_executor_cancel(self):
            called = False
    
            def patched_call_soon(*args):
                nonlocal called
                called = True
    
            def run():
                time.sleep(0.05)
    
            f2 = self.loop.run_in_executor(None, run)
            f2.cancel()
            self.loop.run_until_complete(
                    self.loop.shutdown_default_executor())
            self.loop.close()
            self.loop.call_soon = patched_call_soon
            self.loop.call_soon_threadsafe = patched_call_soon
            time.sleep(0.4)
            self.assertFalse(called)
    

    You can see it will wait for self.loop.shutdown_default_executor().

    Now let's see how it looks like.

    # event.pyi
    # ...
    
    class BaseEventLoop(events.AbstractEventLoop):
        # ...
    
        async def shutdown_default_executor(self):
            """Schedule the shutdown of the default executor."""
            self._executor_shutdown_called = True
            if self._default_executor is None:
                return
            future = self.create_future()
            thread = threading.Thread(target=self._do_shutdown, args=(future,))
            thread.start()
            try:
                await future
            finally:
                thread.join()
    
        def _do_shutdown(self, future):
            try:
                self._default_executor.shutdown(wait=True)
                self.call_soon_threadsafe(future.set_result, None)
            except Exception as ex:
                self.call_soon_threadsafe(future.set_exception, ex)
    

    Here, we can see it creates another thread to wait for _do_shutdown, which then runs self._default_executor.shutdown with wait=True parameter.

    Then where the shutdown is implemented:

    # Python310/Lib/concurrent/futures/thread.py
    # ...
    
    class ThreadPoolExecutor(_base.Executor):
        # ...
    
        def shutdown(self, wait=True, *, cancel_futures=False):
            with self._shutdown_lock:
                self._shutdown = True
                if cancel_futures:
                    # Drain all work items from the queue, and then cancel their
                    # associated futures.
                    while True:
                        try:
                            work_item = self._work_queue.get_nowait()
                        except queue.Empty:
                            break
                        if work_item is not None:
                            work_item.future.cancel()
    
                # Send a wake-up to prevent threads calling
                # _work_queue.get(block=True) from permanently blocking.
                self._work_queue.put(None)
            if wait:
                for t in self._threads:
                    t.join()
    

    When wait=True it decides to wait for all thread to be gracefully stops.

    From all these we can't see any effort to actually cancel a thread.

    To quote from Trio Documentation:

    Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a thread. This function will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled:

    • If cancellable=False, the function ignores the cancellation and keeps going, just like if we had called sync_fn synchronously. This is the default behavior.
    • If cancellable=True, then this function immediately raises Cancelled. In this case the thread keeps running in background – we just abandon it to do whatever it’s going to do, and silently discard any return value or errors that it raises.

    So, from these we can learn that there's no way to terminate infinite-loop running in thread.


    Workaround

    Since now we know we have to design what's going to run in thread with a bit more care, we need a way to signal the thread that we want to stop.

    We can use threading.Event for such cases.

    (Originally I wrote answer with asyncio.Event but that is not thread safe because we're moving function execution to another thread probably better off not using it.)

    import time
    import asyncio
    import threading
    
    
    def blocking_func(event: threading.Event):
        while not event.is_set():
            time.sleep(1)
            print("I'm still standing")
    
    
    async def main():
        event = threading.Event()
        asyncio.create_task(asyncio.to_thread(blocking_func, event))
    
        await asyncio.sleep(5)
        # now lets stop
        event.set()
    
    asyncio.run(main())
    

    By checking event on every loop we can see program terminating gracefully.

    I'm still standing
    I'm still standing
    I'm still standing
    I'm still standing
    I'm still standing
    I'm still standing
    
    Process finished with exit code 0