pythonpython-asyncio

Async producer/consumer running serially


I'm getting started with asyncio and created this:

import asyncio

async def handle_data(q):
    while (item := await q.get()) is not None:
        print(f'Consuming {item}')

async def main():
    q = asyncio.Queue()
    consumer = asyncio.create_task(handle_data(q))

    for k in range(10):
        print(f'Producing {k}')
        await q.put(k)

    await q.put(None)
    await consumer

asyncio.run(main())

The script outputs

Producing 0
Producing 1
Producing 2
Producing 3
Producing 4
Producing 5
Producing 6
Producing 7
Producing 8
Producing 9
Consuming 0
Consuming 1
Consuming 2
Consuming 3
Consuming 4
Consuming 5
Consuming 6
Consuming 7
Consuming 8
Consuming 9

Why is the code running synchronously?


Solution

  • You could try a couple of things (but please read the Note at the end for why you might not want to try either of these things in a real world example):

    1. Use asyncio.sleep(0), which gives another running a task to get dispatched:
    import asyncio
    
    async def handle_data(q):
        while (item := await q.get()) is not None:
            print(f'Consuming {item}')
    
    async def main():
        q = asyncio.Queue()
        consumer = asyncio.create_task(handle_data(q))
    
        for k in range(10):
            print(f'Producing {k}')
            await q.put(k)
            await asyncio.sleep(0)  # Allow another task to get dispatched
    
        await q.put(None)
        await consumer
    
    asyncio.run(main())
    

    On my desktop I get:

    Producing 0
    Consuming 0
    Producing 1
    Consuming 1
    Producing 2
    Consuming 2
    Producing 3
    Consuming 3
    Producing 4
    Consuming 4
    Producing 5
    Consuming 5
    Producing 6
    Consuming 6
    Producing 7
    Consuming 7
    Producing 8
    Consuming 8
    Producing 9
    Consuming 9
    
    1. If the above does not work for you, then specify a queue capacity of 1. This will block main until another task empties the queue:
    import asyncio
    
    async def handle_data(q):
        while (item := await q.get()) is not None:
            print(f'Consuming {item}')
    
    async def main():
        q = asyncio.Queue(1)  # maxsize is 1
        consumer = asyncio.create_task(handle_data(q))
    
        for k in range(10):
            print(f'Producing {k}')
            await q.put(k)
    
        await q.put(None)
        await consumer
    
    asyncio.run(main())
    

    Note

    I am not sure what this accomplishes for you. main and handle_data are still running essentially synchronously with one another and you are getting no overlap in processing given the code you have posted. Suppose handle_data did some relatively long I/O processing such as retrieving a URL from some web site and there were several such URL's that needed retrieving. If I had some number of URL's to retrieve and you wanted to use a specific number of concurrent tasks to do the retrieving, then I would want to be putting the URLs to the queue as quickly as I could and not waiting for a URL to be retrieved from the queue before putting the next one.

    import asyncio
    
    async def fetch_url(q):
        while (url := await q.get()) is not None:
            # Code to fetch data from the url using aiohttp, for example:
            ...
    
    async def main():
        q = asyncio.Queue()
    
        urls = ['url1', 'url2', 'url3', 'url4']
        n_tasks = 2
        tasks = [asyncio.create_task(fetch_url(q)) for _ in range(n_tasks)]
    
        for url in urls:
            await q.put(url)
    
        for _ in range(n_tasks):
            await q.put(None)
    
        await asyncio.gather(*tasks)
    
    asyncio.run(main())
    

    But if you wanted N tasks to retrieve N URLs, then you would want to code this without using a queue at all:

    import asyncio
    
    async def fetch_url(url):
        # Code to fetch data from the url wuing aiohttp, for example:
        ...
    
    async def main():
        urls = ['url1', 'url2', 'url3', 'url4']
    
        await asyncio.gather(*(fetch_url(url) for url in urls))
    
    asyncio.run(main())