python-3.xpython-trio

How to gather task results in Trio?


I wrote a script that uses a nursery and the asks module to loop through and call an API based upon the loop variables. I get responses but don't know how to return the data like you would with asyncio.

I also have a question on limiting the APIs to 5 per second.

from datetime import datetime
import asks
import time
import trio

asks.init("trio")
s = asks.Session(connections=4)

async def main():
    start_time = time.time()

    api_key = 'API-KEY'
    org_id = 'ORG-ID'
    networkIds = ['id1','id2','idn']

    url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
    headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)

    print("Total time:", time.time() - start_time)



async def fetch(url, headers):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)




if __name__ == "__main__":
    trio.run(main)

When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.

Also, I can limit the number of sessions to 1-4, which helps get down below the 5 API per second limit, but was wondering if there was a built in way to ensure that no more than 5 APIs get called in any given second?


Solution

  • Returning data: pass the networkID and a dict to the fetch tasks:

    async def main():
        …
        results = {}
        async with trio.open_nursery() as nursery:
            for i in networkIds:
                nursery.start_soon(fetch, url.format(i), headers, results, i)
        ## results are available here
    
    async def fetch(url, headers, results, i):
        print("Start: ", url)
        response = await s.get(url, headers=headers)
        print("Finished: ", url, len(response.content), response.status_code)
        results[i] = response
    

    Alternately, create a trio.Queue to which you put the results; your main task can then read the results from the queue.

    API limit: create a trio.Queue(10) and start a task along these lines:

    async def limiter(queue):
        while True:
            await trio.sleep(0.2)
            await queue.put(None)
    

    Pass that queue to fetch, as another argument, and call await limit_queue.get() before each API call.