pythonasync-awaitpython-asynciofuturecoroutine

Run an async function from a sync function within an already-running event loop


In my Python application, I have a sync function boo() that is called inside a running event loop. boo() has to get some data from foo(arg1, arg2), which is an async function.

Unfortunately, I can't turn boo() into an async function. It must stay synchronized. (This constraint is out of my hands).

How can I call foo(arg1, arg2) from within boo(), wait until it completes, and continue the execution?

Minimal Reproducible Example

I tried to create a minimal reproducible example. This is the closest I could get. The real application is big and complex, and may behave differently.

import time
import asyncio

async def work_for_data():
    time.sleep(3)
    return 42

# sync function, calling async function
def get_number():
    return asyncio.get_event_loop().run_until_complete(work_for_data())

async def get_data():
    return get_number()

async def run():
    loop = asyncio.get_event_loop()
    task = asyncio.create_task(get_data())
    loop.run_until_complete(task)


if __name__ == "__main__":
    asyncio.run(run())

This code raises:

  File "./minimal_example.py", line 9, in get_number
    return asyncio.get_event_loop().run_until_complete(work_for_data())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 629, in run_until_complete
    self._check_running()
  File "/usr/local/Cellar/python@3.11/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 588, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

Attempts To Solve The Problem

I made a lot of attempts to solve it, all of them didn't work.

Attempt 1

data = asyncio.run(foo(arg1, arg2))

Raised the following exception:

    data = asyncio.run(foo(arg1, arg2))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/.pycharm_helpers/pydevd_asyncio/pydevd_nest_asyncio.py", line 143, in run
    loop.run_until_complete(task)
  File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

Attempt 2

loop = asyncio.get_event_loop()
data = loop.run_until_complete(foo(arg1, arg2))

Raised the following exception:

    data = loop.run_until_complete(foo(arg1, arg2))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

Attempt 3

loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as executor:
        future = executor.submit(lambda: asyncio.run_coroutine_threadsafe(foo(arg1, arg2), loop).result())
        data = future.result()

The interpreter got stuck when executing future.result()

Attempt 4

    loop = asyncio.get_event_loop()
    future = asyncio.Future()

    def callback(task):
        if task.exception():
            future.set_exception(task.exception())
        else:
            future.set_result(task.result())

    task = asyncio.run_coroutine_threadsafe(foo(arg1, arg2), loop)
    task.add_done_callback(callback)

    result = task.result()  ## Stuck here
    return result

The interpreter got stuck when executing task.result()


Solution

  • There is no straightforward way to do that - usually, async functions can only be called from async functions: that´s just the way it is meant to work.

    The reason is that any intermediate synchronous function in a call of chains is not expected to be "paused" when an inner function makes an await statement and expecting the loop to run other concurrent tasks: there are no mechanisms for the synchronous function in the way to be paused and resumed - that is what the async def does.

    So, the obvious and correct approach is just to make all your calling chaing to a function that will need to await an I/O operation (or equivalent), has to be composed of async calls.

    Sometimes this can lead to duplicated code - with very similar functions existing, meant to be called one from sync and one from async code, and even large code bases and advanced frameworks struggle to overcome that.

    HOWEVER if your inner function won´t need the result of calling an async function itself, - i.e., it can just create a task with the async function, and return that task - the async function calling the synchronous function should then keep the returned task and await for it:

    Keep in mind that callback loaded code can be confusing to follow along, and that is actually one of the main motivations for the incorporation of all the async-enabling syntax and mechanisms in the language in the first place.

    So, for code that simply does´t need the return value:

    import asyncio
    
    async def work_for_data():
        time.sleep(3)
        return 42
    
    # sync function, calling async function
    def get_number():
        task = asyncio.create_task(work_for_data())
        return task
    
    async def get_data():
        # trigger sync function that will start the tasks for producing async data
        number_task = get_number()
        # do more concurrent stuff
        
        ...
        # await for the data produced by `get_number()`:
        number = await number_task
    
    async def run()
        global queue
    
        task = asyncio.create_task(get_data())
        await task
    
    
    if __name__ == "__main__":
        asyncio.run(run())
    

    Other approaches are to to have the synchronous code to create a task for an intermediate function that will await the target async function, and put its return value in a queue, which then could be consumed elsewhere - whenever the result is ready. Or if the snchronous function will have code itself which needs to deal with the values produced in the async code, then, the way to do it is through setting a callback, by calling the .add_done_callback method on the created task . The result of the processing of this callback won't be returned to the async function calling the sync (get_number) function though. If that is needed, than the plain approach of just making all your call chain async is preferrable.