pythonsubprocesspython-multiprocessingpython-asyncioprocess-pool

How to terminate long-running computation (CPU bound task) in Python using asyncio and concurrent.futures.ProcessPoolExecutor?


Similar Question (but answer does not work for me): How to cancel long-running subprocesses running using concurrent.futures.ProcessPoolExecutor?

Unlike the question linked above and the solution provided, in my case the computation itself is rather long (CPU bound) and cannot be run in a loop to check if some event has happened.

Reduced version of the code below:

import asyncio
import concurrent.futures as futures
import time

class Simulator:
    def __init__(self):
        self._loop = None
        self._lmz_executor = None
        self._tasks = []
        self._max_execution_time = time.monotonic() + 60
        self._long_running_tasks = []

    def initialise(self):
        # Initialise the main asyncio loop
        self._loop = asyncio.get_event_loop()
        self._loop.set_default_executor(
            futures.ThreadPoolExecutor(max_workers=3))

        # Run separate processes of long computation task
        self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)

    def run(self):
        self._tasks.extend(
            [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
        )

        try:
            # Gather bot reasoner tasks
            _reasoner_tasks = asyncio.gather(*self._tasks)
            # Send the reasoner tasks to main monitor task
            asyncio.gather(self.sample_main_loop(_reasoner_tasks))
            self._loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            self._loop.close()

    async def sample_main_loop(self, reasoner_tasks):
        """This is the main monitor task"""
        await asyncio.wait_for(reasoner_tasks, None)
        for task in self._long_running_tasks:
            try:
                await asyncio.wait_for(task, 10)
            except asyncio.TimeoutError:
                print("Oops. Some long operation timed out.")
                task.cancel()  # Doesn't cancel and has no effect
                task.set_result(None)  # Doesn't seem to have an effect

        self._lmz_executor.shutdown()
        self._loop.stop()
        print('And now I am done. Yay!')

    async def bot_reasoning_loop(self, bot):
        import math

        _exec_count = 0
        _sleepy_time = 15
        _max_runs = math.floor(self._max_execution_time / _sleepy_time)

        self._long_running_tasks.append(
            self._loop.run_in_executor(
                    self._lmz_executor, really_long_process, _sleepy_time))

        while time.monotonic() < self._max_execution_time:
            print("Bot#{}: thinking for {}s. Run {}/{}".format(
                    bot, _sleepy_time, _exec_count, _max_runs))
            await asyncio.sleep(_sleepy_time)
            _exec_count += 1

        print("Bot#{} Finished Thinking".format(bot))

def really_long_process(sleepy_time):
    print("I am a really long computation.....")
    _large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: {}".format(_large_val))

if __name__ == "__main__":
    sim = Simulator()
    sim.initialise()
    sim.run()

The idea is that there is a main simulation loop that runs and monitors three bot threads. Each of these bot threads then perform some reasoning but also start a really long background process using ProcessPoolExecutor, which may end up running longer their own threshold/max execution time for reasoning on things.

As you can see in the code above, I attempted to .cancel() these tasks when a timeout occurs. Though this is not really cancelling the actual computation, which keeps happening in the background and the asyncio loop doesn't terminate until after all the long running computation have finished.

How do I terminate such long running CPU-bound computations within a method?

Other similar SO questions, but not necessarily related or helpful:

  1. asyncio: Is it possible to cancel a future been run by an Executor?
  2. How to terminate a single async task in multiprocessing if that single async task exceeds a threshold time in Python
  3. Asynchronous multiprocessing with a worker pool in Python: how to keep going after timeout?

Solution

  • How do I terminate such long running CPU-bound computations within a method?

    The approach you tried doesn't work because the futures returned by ProcessPoolExecutor are not cancellable. Although asyncio's run_in_executor tries to propagate the cancellation, it is simply ignored by Future.cancel once the task starts executing.

    There is no fundamental reason for that. Unlike threads, processes can be safely terminated, so it would be perfectly possible for ProcessPoolExecutor.submit to return a future whose cancel terminated the corresponding process. Asyncio coroutines have well-defined cancellation semantics and could automatically make use of it. Unfortunately, ProcessPoolExecutor.submit returns a regular concurrent.futures.Future, which assumes the lowest common denominator of the underlying executors, and treats a running future as untouchable.

    As a result, to cancel tasks executed in subprocesses, one must circumvent the ProcessPoolExecutor altogether and manage one's own processes. The challenge is how to do this without reimplementing half of multiprocessing. One option offered by the standard library is to (ab)use multiprocessing.Pool for this purpose, because it supports reliable shutdown of worker processes. A CancellablePool could work as follows:

    Here is a sample implementation of that idea:

    import asyncio
    import multiprocessing
    
    class CancellablePool:
        def __init__(self, max_workers=3):
            self._free = {self._new_pool() for _ in range(max_workers)}
            self._working = set()
            self._change = asyncio.Event()
    
        def _new_pool(self):
            return multiprocessing.Pool(1)
    
        async def apply(self, fn, *args):
            """
            Like multiprocessing.Pool.apply_async, but:
             * is an asyncio coroutine
             * terminates the process if cancelled
            """
            while not self._free:
                await self._change.wait()
                self._change.clear()
            pool = usable_pool = self._free.pop()
            self._working.add(pool)
    
            loop = asyncio.get_event_loop()
            fut = loop.create_future()
            def _on_done(obj):
                loop.call_soon_threadsafe(fut.set_result, obj)
            def _on_err(err):
                loop.call_soon_threadsafe(fut.set_exception, err)
            pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
    
            try:
                return await fut
            except asyncio.CancelledError:
                pool.terminate()
                usable_pool = self._new_pool()
            finally:
                self._working.remove(pool)
                self._free.add(usable_pool)
                self._change.set()
    
        def shutdown(self):
            for p in self._working | self._free:
                p.terminate()
            self._free.clear()
    

    A minimalistic test case showing cancellation:

    def really_long_process():
        print("I am a really long computation.....")
        large_val = 9729379273492397293479237492734 ** 344323
        print("I finally computed this large value: {}".format(large_val))
    
    async def main():
        loop = asyncio.get_event_loop()
        pool = CancellablePool()
    
        tasks = [loop.create_task(pool.apply(really_long_process))
                 for _ in range(5)]
        for t in tasks:
            try:
                await asyncio.wait_for(t, 1)
            except asyncio.TimeoutError:
                print('task timed out and cancelled')
        pool.shutdown()
    
    asyncio.get_event_loop().run_until_complete(main())
    

    Note how the CPU usage never exceeds 3 cores, and how it starts dropping near the end of the test, indicating that the processes are being terminated as expected.

    To apply it to the code from the question, make self._lmz_executor an instance of CancellablePool and change self._loop.run_in_executor(...) to self._loop.create_task(self._lmz_executor.apply(...)).