I'm getting started with asyncio
and created this:
import asyncio
async def handle_data(q):
while (item := await q.get()) is not None:
print(f'Consuming {item}')
async def main():
q = asyncio.Queue()
consumer = asyncio.create_task(handle_data(q))
for k in range(10):
print(f'Producing {k}')
await q.put(k)
await q.put(None)
await consumer
asyncio.run(main())
The script outputs
Producing 0
Producing 1
Producing 2
Producing 3
Producing 4
Producing 5
Producing 6
Producing 7
Producing 8
Producing 9
Consuming 0
Consuming 1
Consuming 2
Consuming 3
Consuming 4
Consuming 5
Consuming 6
Consuming 7
Consuming 8
Consuming 9
Why is the code running synchronously?
You could try a couple of things (but please read the Note at the end for why you might not want to try either of these things in a real world example):
asyncio.sleep(0)
, which gives another running a task to get dispatched:import asyncio
async def handle_data(q):
while (item := await q.get()) is not None:
print(f'Consuming {item}')
async def main():
q = asyncio.Queue()
consumer = asyncio.create_task(handle_data(q))
for k in range(10):
print(f'Producing {k}')
await q.put(k)
await asyncio.sleep(0) # Allow another task to get dispatched
await q.put(None)
await consumer
asyncio.run(main())
On my desktop I get:
Producing 0
Consuming 0
Producing 1
Consuming 1
Producing 2
Consuming 2
Producing 3
Consuming 3
Producing 4
Consuming 4
Producing 5
Consuming 5
Producing 6
Consuming 6
Producing 7
Consuming 7
Producing 8
Consuming 8
Producing 9
Consuming 9
main
until another task empties the queue:import asyncio
async def handle_data(q):
while (item := await q.get()) is not None:
print(f'Consuming {item}')
async def main():
q = asyncio.Queue(1) # maxsize is 1
consumer = asyncio.create_task(handle_data(q))
for k in range(10):
print(f'Producing {k}')
await q.put(k)
await q.put(None)
await consumer
asyncio.run(main())
Note
I am not sure what this accomplishes for you. main
and handle_data
are still running essentially synchronously with one another and you are getting no overlap in processing given the code you have posted. Suppose handle_data
did some relatively long I/O processing such as retrieving a URL from some web site and there were several such URL's that needed retrieving. If I had some number of URL's to retrieve and you wanted to use a specific number of concurrent tasks to do the retrieving, then I would want to be putting the URLs to the queue as quickly as I could and not waiting for a URL to be retrieved from the queue before putting the next one.
import asyncio
async def fetch_url(q):
while (url := await q.get()) is not None:
# Code to fetch data from the url using aiohttp, for example:
...
async def main():
q = asyncio.Queue()
urls = ['url1', 'url2', 'url3', 'url4']
n_tasks = 2
tasks = [asyncio.create_task(fetch_url(q)) for _ in range(n_tasks)]
for url in urls:
await q.put(url)
for _ in range(n_tasks):
await q.put(None)
await asyncio.gather(*tasks)
asyncio.run(main())
But if you wanted N tasks to retrieve N URLs, then you would want to code this without using a queue at all:
import asyncio
async def fetch_url(url):
# Code to fetch data from the url wuing aiohttp, for example:
...
async def main():
urls = ['url1', 'url2', 'url3', 'url4']
await asyncio.gather(*(fetch_url(url) for url in urls))
asyncio.run(main())