pythonasynchronouspython-asyncioaiohttp

Call async code inside sync code inside async code


I've found myself in a tricky situation which I can't seem to find my way out of:

  1. I've written an application which uses asyncio (and, in particular, aiohttp).
  2. In part of this application, I need to use a third-party library (in my case, weasyprint) which is not async.
  3. I want to pass a callback (here, a url fetcher) into this third-party library. This callback must obviously be sync, but I want to use my existing async code as part of this callback. (I already have an aiohttp client session I want to use, and I've implemented some client middlewares in this session which do some useful things. I don't want to have to write a second fully sync version of this code.)

I've tried:

I do understand that, obviously, weasyprint won't be able to take advantage of the performance benefits of using async code, but simply for the sake of not having to write the same code twice (a sync version and an async version) I'd like to find a way to force it to use an async callback, even though that callback in reality just runs synchronously.

I really only have a surface-level understanding of asyncio; I don't know a lot of the lower-level details about how event loops work and all that, so I'm sure the above approaches are a bit naive, but it seems to me like there ought to be some way to accomplish this.

Sample code using my first attempted method

def print_weasyprint(html:IOBase, resources:aiohttp.ClientSession):
    out = io.BytesIO()
    HTML(
        file_obj=html,
        url_fetcher=_make_url_fetcher(resources),
    ).write_pdf(out)
    return out

def _call_async_secondary_loop(coro):
    q = queue.Queue(1)
    def secondary():
        loop = asyncio.new_event_loop()
        q.put(loop.run_until_complete(coro))
    Thread(target=secondary).start()
    return q.get()


def _make_url_fetcher(resources:aiohttp.ClientSession):
    def fetcher(url, *args, **kwargs):
        content = _call_async_secondary_loop(_fetch(resources, url))
        return {'string':content}
    return fetcher

async def _fetch(resources, url):
    async with reources.get(url) as resource:
        return await resource.content.read()

(Here, although print_weasyprint is sync, but is being called from an async context in the application, so it's still running in the event loop.)

Sample code using my second attempted method

async def print_weasyprint(html:io.IOBase, resources:aiohttp.ClientSession):
    loop = asyncio.get_running_loop()
    fetcher = _make_url_fetcher(resources, loop)
    return await asyncio.to_thread(_print_weasyprint_sync, fetcher, html, resources, out)


def _print_weasyprint_sync(fetcher, html:io.IOBase, resources:aiohttp.ClientSession):
    out = io.BytesIO()
    HTML(
        file_obj=html,
        url_fetcher=fetcher,
    ).write_pdf(out)
    return out


def _make_url_fetcher(resources:aiohttp.ClientSession, loop):
    def fetcher(url, *args, **kwargs):
        content, content_type = loop.run_until_complete(_fetch(resources, url))
        return {
            'string':content,
            'mime_type':content_type,
        }
    return fetcher

async def _fetch(resources:aiohttp.ClientSession, url):
    async with resources.get(url) as resource:
        return await resource.content.read(), resource.content_type

Solution

  • To summarize: Your async function calls a sync function which calls some sync api function that takes a sync callback as an argument that really needs to execute as an async function.

    Since directly calling a sync function from an async function blocks the event loop until the sync function returns, we need to "call" the sync function using either asyncio.to_thread or asyncio.run_in_executor. When the sync callback is invoked, it will be running in a different thread other than the main event loop thread. Thus, it must invoke the async callback using asyncio.run_coroutine_threadsafe.

    In the following demo we have the following functions:

    1. main_async_function: The initial async function that needs to invoke a sync function, sync_function.
    2. sync_function: This is invoked by main_async_function and needs to make an api call and return the result. It also needs to specify a callback whose main logic will run as an async function.
    3. sync_callback: This is the callback passed to the api function. It needs to run the actual logic in async_callback.
    4. async_callback: The callback logic running in the main event loop.
    5. api: A function that emulates some api function call.
    import asyncio
    from collections.abc import Callable
    from functools import partial
    
    async def main_async_function() -> None:
        argument: int = 1
        # "Call" sync function runnning in another thread
        # so as to not block the event loop:
        print('api call argument:', argument)
        result = await asyncio.to_thread(sync_function, asyncio.get_running_loop(), argument)
        print('api call result:', result)
    
    def sync_function(loop: asyncio.AbstractEventLoop, argument: int) -> int:
    
        # Call the api specifying our sync callback and return the api result:
        return api(argument, partial(sync_callback, loop))
    
    def sync_callback(loop: asyncio.AbstractEventLoop, some_value: int) -> None:
        future = asyncio.run_coroutine_threadsafe(async_callback(some_value), loop)
        future.result()  # Wait for the callback to complete
    
    async def async_callback(argument: int) -> None:
        print('callback argument:', argument)
        ...  # Do some work
    
    def api(argument: int, callable: Callable[[int], None]) -> int:
        ... # simulate doing some work:
        result = argument * 2
        callable(result)
        return result
    
    if __name__ == '__main__':
        asyncio.run(main_async_function())
    

    Prints:

    api call argument: 1
    callback argument: 2
    api call result: 2