pythonmultithreadingpython-asyncioevent-loop

How do I create thread-safe asyncio background tasks?


I have an event loop running in a separate thread from the main thread, and I would like to add a background task to this event loop from the main thread using the asyncio.create_task function.

I am able to achieve this using the asyncio.run_coroutine_threadsafe function, but that ends up returning a concurrent.futures._base.Future object as opposed to the asyncio.Task object that the asyncio.create_task function returns.

I would really like to have a Task object returned as I'm making use of the "name" attribute of the Task object in my application logic. Is there any way to implement this?


Solution

    1. We first create a daemon thread in which a new event loop runs.
    2. Function submit runs an arbitrary coroutine in the event loop/thread created in step 1. If argument return_result is True (the default), submit waits for the coroutine to complete and returns its result. Otherwise, submit return a Future and the caller can get the coroutine's result by executing the future's result method. This function is used by the functions described next.
    3. Function create_task creates a task in the daemon thread and returns that task.
    4. The returned task can be awaited and the task's result return by calling await_task with the return_result argument set to True (the default). This will block the main thread until the result can be returned. Alternatively, await_task can be called with return_result=False, in which case a Future will be returned whose result can be obtained later.
    import asyncio
    import threading
    
    _event_loop = asyncio.new_event_loop()
    threading.Thread(target=_event_loop.run_forever, name="Async Runner", daemon=True).start()
    
    def submit(coro, return_result=True):
        """Run a coroutine in the "other thread". If return_result is True,
        we return the result. Otherwise we return the future."""
        future = asyncio.run_coroutine_threadsafe(coro, _event_loop)
        return future.result() if return_result else future
    
    def create_task(coro):
        """Creates a task in the other thread and return the task."""
        async def task_creator():
            return asyncio.create_task(coro)
    
        return submit(task_creator())
    
    def await_task(task, return_result=True):
        """Submit to the return_result thread a coroutine to await the
        passed thread. If wait is True, we wait for the task to complete
        and return the result. Otherwise, we return to the caller a future
        whose result can be obtained when the caller wants."""
        async def waiter():
            return await task
    
        return submit(waiter(), return_result=return_result)
    
    
    if __name__ == '__main__':
        async def some_coro():
            await asyncio.sleep(1)
            return 'Done'
    
        async def main():
            task = create_task(some_coro())
    
            # We choose to have a future returned instead
            # of the actual result from executing the task:
            future = await_task(task, return_result=False)
            ... # Do other work
            print(future.result())
    
        asyncio.run(main())
    

    Prints:

    Done
    

    Update

    Here is another implementation where waiting for the submitted task to complete does not block the main thread by using asyncio.to_thread. Now all functions are async:

    import asyncio
    import threading
    
    _event_loop = asyncio.new_event_loop()
    threading.Thread(target=_event_loop.run_forever, name="Async Runner", daemon=True).start()
    
    async def submit(coro):
        """Run a coroutine in the "other thread and await its result"""
        def submitter():
            future = asyncio.run_coroutine_threadsafe(coro, _event_loop)
            return future.result()
    
        return await asyncio.to_thread(submitter)
    
    async def create_task(coro):
        """Creates a task in the other thread and return the task."""
        async def task_creator():
            return asyncio.create_task(coro)
    
        return await submit(task_creator())
    
    async def await_task(task):
        """Await the task running in the other thread and return its result."""
        async def waiter():
            return await task
    
        return await submit(waiter())
    
    if __name__ == '__main__':
        async def some_coro():
            await asyncio.sleep(1)
            return 'Done'
    
        async def main():
            task = await create_task(some_coro())
            ... # Do other work
            print(await await_task(task))
    
        asyncio.run(main())