pythonpython-3.xasynchronousconcurrencypython-asyncio

How to limit concurrency with Python asyncio?


Let's assume we have a bunch of links to download and each of the link may take a different amount of time to download. And I'm allowed to download using utmost 3 connections only. Now, I want to ensure that I do this efficiently using asyncio.

Here's what I'm trying to achieve: At any point in time, try to ensure that I have atleast 3 downloads running.

Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----

The numbers represent the download links, while hyphens represent Waiting for download.

Here is the code that I'm using right now

from random import randint
import asyncio

count = 0


async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()


async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

And the output is as expected:

downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8

But here are my questions:

  1. At the moment, I'm simply waiting for 9 seconds to keep the main function running till the downloads are complete. Is there an efficient way of waiting for the last download to complete before exiting the main function? (I know there's asyncio.wait, but I'll need to store all the task references for it to work)

  2. What's a good library that does this kind of task? I know javascript has a lot of async libraries, but what about Python?

Edit: 2. What's a good library that takes care of common async patterns? (Something like async)


Solution

  • Before reading the rest of this answer, please note that the idiomatic way of limiting the number of parallel tasks this with asyncio is using asyncio.Semaphore, as shown in Mikhail's answer and elegantly encapsulated in Andrei's answer. This answer contains working, but a bit more complicated ways of achieving the same. I am leaving the answer because in some cases this approach can have advantages over a semaphore, specifically when the amount of items to process is very large or unbounded, and you cannot create all the coroutines in advance. In that case the second (queue-based) solution in this answer is what you want. But in most everyday situations, such as parallel download through aiohttp, one should use a semaphore instead.


    You basically need a fixed-size pool of download tasks. asyncio doesn't come with a pre-made task pool, but it is easy to create one: simply keep a set of tasks and don't allow it to grow past the limit. Although the question states your reluctance to go down that route, the code ends up much more elegant:

    import asyncio, random
    
    async def download(code):
        wait_time = random.randint(1, 3)
        print('downloading {} will take {} second(s)'.format(code, wait_time))
        await asyncio.sleep(wait_time)  # I/O, context will switch to main function
        print('downloaded {}'.format(code))
    
    async def main(loop):
        no_concurrent = 3
        dltasks = set()
        i = 0
        while i < 9:
            if len(dltasks) >= no_concurrent:
                # Wait for some download to finish before adding a new one
                _done, dltasks = await asyncio.wait(
                    dltasks, return_when=asyncio.FIRST_COMPLETED)
            dltasks.add(loop.create_task(download(i)))
            i += 1
        # Wait for the remaining downloads to finish
        await asyncio.wait(dltasks)
    

    An alternative is to create a fixed number of coroutines doing the downloading, much like a fixed-size thread pool, and feed them work using an asyncio.Queue. This removes the need to manually limit the number of downloads, which will be automatically limited by the number of coroutines invoking download():

    # download() defined as above
    
    async def download_worker(q):
        while True:
            code = await q.get()
            await download(code)
            q.task_done()
    
    async def main(loop):
        q = asyncio.Queue()
        workers = [loop.create_task(download_worker(q)) for _ in range(3)]
        i = 0
        while i < 9:
            await q.put(i)
            i += 1
        await q.join()  # wait for all tasks to be processed
        for worker in workers:
            worker.cancel()
        await asyncio.gather(*workers, return_exceptions=True)
    

    As for your other question, the obvious choice would be aiohttp.