pythonjupyter-notebookpython-asyncionest-asyncio

jupyter notebooks-safe asyncio run wrapper method for a library


I'm building a library that leverages asyncio internally. While the user shouldn't be aware of it, the internal implementation currently wraps the async code with the asyncio.run() porcelain wrapper.

However, some users will be executing this library code from a jupyter notebook, and I'm struggling to replace the asyncio.run() with a wrapper that's safe for either environment.

Here's what I've tried:

ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'


def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False)
    try:
        loop = asyncio.get_running_loop()
        task = loop.create_task(async_coroutine)
        result = loop.run_until_complete(task) # <- fails as loop is already running
        # OR
        asyncio.wait_for(task, timeout=None, loop=loop) # <- fails as this is an async method
        result = task.result()
    except RuntimeError as e:
        if _test_mode:
            raise e
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            return asyncio.run(async_coroutine)
    except Exception as e:
        raise e

Requirements

  1. We use python 3.8, so we can't use asyncio.Runner context manager
  2. We can't use threading, so the solution suggested here would not work

Problem:

How can I wait/await for the async_coroutine, or the task/future provided by loop.create_task(async_coroutine) to be completed?

None of the methods above actually do the waiting, and for the reasons stated in the comments.


Update

I've found this nest_asyncio library that's built to solve this problem exactly:


ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'

HAS_BEEN_RUN = False


def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False):
    global HAS_BEEN_RUN
    if not HAS_BEEN_RUN:
        _apply_nested_asyncio_patch()
        HAS_BEEN_RUN = True
    return asyncio.run(async_coroutine)


def _apply_nested_asyncio_patch():
    try:
        loop = asyncio.get_running_loop()
        logger.info(f'as get_running_loop() returned {loop}, this environment has it`s own event loop.\n'
                    f'Patching with nest_asyncio')
        import nest_asyncio
        nest_asyncio.apply()
    except RuntimeError as e:
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            logger.info(f'as get_running_loop() raised {e}, this environment does not have it`s own event loop.\n'
                        f'No patching necessary')
        else:
            raise e

Still, there are some issues I'm facing with it:

  1. As per this SO answer, there might be starvation issues
  2. Any logs written in the async_coroutine are not printed in the jupyter notebook
  3. The jupyter notebook kernel occasionally crashes upon completion of the task

Edit

For context, the library internally calls external APIs for data enrichment of a user-provided dataframe:

# user code using the library
import my_lib

df = pd.DataFrame(data='some data')
enriched_df = my_lib.enrich(df)

Solution

  • It's usually a good idea to expose the asynchronous function. This way you will give your users more flexibility.

    If some of your users can't (or don't want to) use asynchronous calls to your functions, they will be able to call the async function using asyncio.run(your_function()). Or in the rare situation where they have an event loop running but can't make async calls they could use the create_task + add_one_callback method described here. (I really have no idea why such a use case may happen, but for the sake of the argument I included it.)

    Hidding the asynchronous interface from your users is not the best idea because it limits their capabilities. They will probably fork your package to patch it and make the exposed function async or call the hidden async function directly. None of which is good news for you (harder to document / track bugs). I would really suggest to stick to the simplest solution and provide the async functions as the main entry points.

    Suppose the following package code followed by 3 different usage of it:

    async def package_code():
        return "package"
    

    Client codes

    Typical clients will probably just use it this way:

    async def client_code_a():
        print(await package_code())
    
    # asyncio.run(client_code_a())
    

    For some people, the following might make sense. For example if your package is the only asynchronous thing they will ever use. Or maybe they are not yet confortable using async code (these you can probably convince to try client_code_a instead):

    def client_code_b():
        print(asyncio.run(package_code()))
    
    # client_code_b()
    

    The very few (I'm tempted to say none):

    async def client_code_c():
        # asyncio.run() cannot be called from a running event loop:
        # print(asyncio.run(package_code()))
        loop = asyncio.get_running_loop()
        task = loop.create_task(package_code())
        task.add_done_callback(lambda t: print(t.result()))
    
    # asyncio.run(client_code_c())