pythonpython-3.xpython-asyncio

Converting small functions to coroutines


I feel like there is a gap in my understanding of async IO: is there a benefit to wrapping small functions into coroutines, within the scope of larger coroutines? Is there a benefit to this in signaling the event loop correctly? Does the extent of this benefit depend on whether the wrapped function is IO or CPU-bound?

Example: I have a coroutine, download(), which:

  1. Downloads JSON-serialized bytes from an HTTP endpoint via aiohttp.
  2. Compresses those bytes via bz2.compress() - which is not in itself awaitable
  3. Writes the compressed bytes to S3 via aioboto3

So parts 1 & 3 use predefined coroutines from those libraries; part 2 does not, by default.

Dumbed-down example:

import bz2
import io
import aiohttp
import aioboto3

async def download(endpoint, bucket_name, key):
    async with aiohttp.ClientSession() as session:
        async with session.request("GET", endpoint, raise_for_status=True) as resp:
            raw = await resp.read()  # payload (bytes)
            # Yikes - isn't it bad to throw a synchronous call into the middle
            # of a coroutine?
            comp = bz2.compress(raw)
            async with (
                aioboto3.session.Session()
                .resource('s3')
                .Bucket(bucket_name)
            ) as bucket:
                await bucket.upload_fileobj(io.BytesIO(comp), key)

As hinted by the comment above, my understanding has always been that throwing a synchronous function like bz2.compress() into a coroutine can mess with it. (Even if bz2.compress() is probably more IO-bound than CPU-bound.)

So, is there generally any benefit to this type of boilerplate?

async def compress(*args, **kwargs):
    return bz2.compress(*args, **kwargs)

(And now comp = await compress(raw) within download().)

Wa-la, this is now an awaitable coroutine, because a sole return is valid in a native coroutine. Is there a case to be made for using this?

Per this answer, I've heard justification for randomly throwing in asyncio.sleep(0) in a similar manner - just to single back up to the event loop that the calling coroutine wants a break. Is this right?


Solution

  • So, is there generally any benefit to this type of boilerplate?

    async def compress(*args, **kwargs):
        return bz2.compress(*args, **kwargs)
    

    There is no benefit to it whatsoever. Contrary to expectations, adding an await doesn't guarantee that the control will be passed to the event loop - that will happen only if the awaited coroutine actually suspends. Since compress doesn't await anything, it will never suspend, so it's a coroutine in name only.

    Note that adding await asyncio.sleep(0) in coroutines does not solve the problem; see this answer for a more detailed discussion. If you need to run a blocking function, use run_in_executor:

    async def compress(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, lambda: bz2.compress(*args, **kwargs))