pythonasynchronousasync-awaitpython-asynciocoroutine

asyncio: collecting results from an async function in an executor


I would like to start a large number of HTTP requests and collect their results, once all of them have returned. Sending the requests in a non-blocking fashion is possible with asyncio, but I have problems collecting their results.

I'm aware of solutions such as aiohttp that are made for this specific problem. But the HTTP requests are just an example, my question is how to use asyncio correctly.

On the server-side, I have flask which answers every request to localhost/ with "Hello World!", but it waits 0.1 seconds before answering. In all my examples, I'm sending 10 requests. A synchronous code should take about 1 second, an asynchronous version could do it in 0.1 seconds.

On the client-side I want to spin up many requests at the same time and collect their results. I'm trying to do this in three different ways. Since asyncio needs an executor to work around blocking code, all of the approaches call loop.run_in_executor.

This code is shared between them:

import requests
from time import perf_counter
import asyncio

loop = asyncio.get_event_loop()

async def request_async():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

def request_sync():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

Approach 1:

Use asyncio.gather() on a list of tasks and then run_until_complete. After reading Asyncio.gather vs asyncio.wait, it seemed like gather would wait on the results. But it doesn't. So this code returns instantly, without waiting for the requests to finish. If I use a blocking function here, this works. Why can't I use an async function ?

# approach 1
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003

# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)

stop = perf_counter()
print(f"finished {stop - start}") # 0.112

Python even warns me that coroutine "request_async" was never awaited. At this point, I have a working solution: Using a normal (not async) function in an executor. But I would like to have a solution that works with async function definitions. Because I would like to use await inside them (in this simple example that is not necessary, but if I move more code to asyncio, I'm sure it will become important).

Approach 2:

Python warns me that my coroutines are never awaited. So let's await them. Approach 2 wraps all the code into an outer async function and awaits the result from the gathering. Same problem, also returns instantly (also same warning):

# approach 2
async def main():

    tasks = []
    for i in range(10):
        tasks.append(loop.run_in_executor(None, request_async))

    gathered_tasks = asyncio.gather(*tasks)

    return await gathered_tasks # <-------- here I'm waiting on the coroutine 

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}")  # 0.0036

This really confused me. I'm waiting on the result of gather. Intuitively that should be propagated to the coroutines that I'm gathering. But python still complains that my coroutine is never awaited.

I read some more and found: How could I use requests in asyncio?

This is pretty much exactly my example: Combining requests and asyncio. Which brings me to approach 3:

Approach 3:

Same structure as approach 2, but wait on each task that was given to run_in_executor() individually (surely this counts as awaiting the coroutine):

# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():

    tasks = []
    for i in range(10):
        task = loop.run_in_executor(None, request_async)
        tasks.append(task)

    responses = []
    for task in tasks:
        response = await task
        responses.append(response)

    return responses

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()

print(f"finished {stop - start}") # 0.004578

My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?


Solution

  • My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?

    The answer is that you're not supposed to have blocking code in your coroutines. If you must have it, you have to isolate it using run_in_executor. So the correct way to write request_async using requests is:

    async def request_async():
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, request_sync)
    

    Passing request_async to run_in_executor doesn't make sense because the entire point of run_in_executor is to invoke a sync function in a different thread. If you give it a coroutine function, it will happily call it (in another thread) and provide the returned coroutine object as "result". That's like passing a generator to a code that expects an ordinary function - yes, it will call the generator just fine, but it will not know what to do with the returned object.

    In general, you cannot just put async in front of the def and expect to get a usable coroutine. A coroutine must not block, except by awaiting other asynchronous code.

    Once you have a usable request_async, you can collect its results with standard tools like asyncio.gather:

    async def main():
        coros = [request_async() for _i in range(10)]
        results = await asyncio.gather(*coros)
        return results
    
    results = loop.run_until_complete(main())