I've found myself in a tricky situation which I can't seem to find my way out of:
loop.run_until_complete
. I then used a queue.Queue
to block execution on the main thread until the secondary thread with the second event loop returned a result. Unfortunately, this threw errors due to trying to run tasks in the wrong event loop. It seems I can't use the client session outside the event loop where it was created.asyncio.to_thread
) and then trying to use loop.run_until_complete
in the callback (with loop
here referencing the single main event loop). Unfortunately, this failed because "this event loop is already running." It didn't seem to matter that it was only await
ing the weasyprint thread, it still wouldn't let me run another coroutine in the loop.I do understand that, obviously, weasyprint won't be able to take advantage of the performance benefits of using async code, but simply for the sake of not having to write the same code twice (a sync version and an async version) I'd like to find a way to force it to use an async callback, even though that callback in reality just runs synchronously.
I really only have a surface-level understanding of asyncio; I don't know a lot of the lower-level details about how event loops work and all that, so I'm sure the above approaches are a bit naive, but it seems to me like there ought to be some way to accomplish this.
def print_weasyprint(html:IOBase, resources:aiohttp.ClientSession):
out = io.BytesIO()
HTML(
file_obj=html,
url_fetcher=_make_url_fetcher(resources),
).write_pdf(out)
return out
def _call_async_secondary_loop(coro):
q = queue.Queue(1)
def secondary():
loop = asyncio.new_event_loop()
q.put(loop.run_until_complete(coro))
Thread(target=secondary).start()
return q.get()
def _make_url_fetcher(resources:aiohttp.ClientSession):
def fetcher(url, *args, **kwargs):
content = _call_async_secondary_loop(_fetch(resources, url))
return {'string':content}
return fetcher
async def _fetch(resources, url):
async with reources.get(url) as resource:
return await resource.content.read()
(Here, although print_weasyprint
is sync, but is being called from an async context in the application, so it's still running in the event loop.)
async def print_weasyprint(html:io.IOBase, resources:aiohttp.ClientSession):
loop = asyncio.get_running_loop()
fetcher = _make_url_fetcher(resources, loop)
return await asyncio.to_thread(_print_weasyprint_sync, fetcher, html, resources, out)
def _print_weasyprint_sync(fetcher, html:io.IOBase, resources:aiohttp.ClientSession):
out = io.BytesIO()
HTML(
file_obj=html,
url_fetcher=fetcher,
).write_pdf(out)
return out
def _make_url_fetcher(resources:aiohttp.ClientSession, loop):
def fetcher(url, *args, **kwargs):
content, content_type = loop.run_until_complete(_fetch(resources, url))
return {
'string':content,
'mime_type':content_type,
}
return fetcher
async def _fetch(resources:aiohttp.ClientSession, url):
async with resources.get(url) as resource:
return await resource.content.read(), resource.content_type
To summarize: Your async function calls a sync function which calls some sync api function that takes a sync callback as an argument that really needs to execute as an async function.
Since directly calling a sync function from an async function blocks the event loop until the sync function returns, we need to "call" the sync function using either asyncio.to_thread
or asyncio.run_in_executor
. When the sync callback is invoked, it will be running in a different thread other than the main event loop thread. Thus, it must invoke the async callback using asyncio.run_coroutine_threadsafe
.
In the following demo we have the following functions:
main_async_function
: The initial async function that needs to invoke a sync function, sync_function
.sync_function
: This is invoked by main_async_function
and needs to make an api call and return the result. It also needs to specify a callback whose main logic will run as an async function.sync_callback
: This is the callback passed to the api function. It needs to run the actual logic in async_callback
.async_callback
: The callback logic running in the main event loop.api
: A function that emulates some api function call.import asyncio
from collections.abc import Callable
from functools import partial
async def main_async_function() -> None:
argument: int = 1
# "Call" sync function runnning in another thread
# so as to not block the event loop:
print('api call argument:', argument)
result = await asyncio.to_thread(sync_function, asyncio.get_running_loop(), argument)
print('api call result:', result)
def sync_function(loop: asyncio.AbstractEventLoop, argument: int) -> int:
# Call the api specifying our sync callback and return the api result:
return api(argument, partial(sync_callback, loop))
def sync_callback(loop: asyncio.AbstractEventLoop, some_value: int) -> None:
future = asyncio.run_coroutine_threadsafe(async_callback(some_value), loop)
future.result() # Wait for the callback to complete
async def async_callback(argument: int) -> None:
print('callback argument:', argument)
... # Do some work
def api(argument: int, callable: Callable[[int], None]) -> int:
... # simulate doing some work:
result = argument * 2
callable(result)
return result
if __name__ == '__main__':
asyncio.run(main_async_function())
Prints:
api call argument: 1
callback argument: 2
api call result: 2