Consider this example where I have 3 worker tasks that push results in a queue and a tasks that deals with the pushed data.
async def worker1(queue: asyncio.Queue):
while True:
res = await do_some_work(param=1)
await queue.put(res)
async def worker2(queue: asyncio.Queue):
while True:
res = await do_some_work(param=2)
await queue.put(res)
async def worker3(queue: asyncio.Queue):
while True:
res = await do_some_work(param=3)
await queue.put(res)
async def handle_results(queue: asyncio.Queue):
while True:
res = await queue.get()
await handle_result(res)
queue.task_done()
async def main():
queue = asyncio.Queue()
t1 = asyncio.create_task(worker1(queue))
t2 = asyncio.create_task(worker2(queue))
t3 = asyncio.create_task(worker3(queue))
handler = asyncio.create_task(handle_result(queue))
while True:
# do some other stuff
....
asyncio.run(main())
The documentation says that asyncio.Queue
is not thread-safe, but this should not apply here because all tasks are running in the same thread. But do I need an asyncio.Lock
to protect the queue when I have 3 tasks that push into the same queue? Looking at the implementation in Python 3.12 (which creates a putter
future and awaits on it before pushing into the queue) I would say no, but I'm not sure and the documentation does not mention what would happen in this case. So, is the asyncio.Lock
in this case necessary?
No - there is no need to locks to put or read items from asyncio Queues.
Keep in mind that Python multithreaded code will already require much less locks than most code in other languages, as the data-structures themselves are thread-safe - so, even with a free-threading build (without the GIL) if you have several threads appending values to a list, for example, the list will always be in a consistent state. Of course, code which would modify or create new keys in a shared dictionary will need proper locks, even though the dictionary itself won't ever "break".
When we step down do async programming, other concurrent tasks will only ever run when our code hit an await
expression (or async for
or async with
statement) - so the need for locks is reduced even further.
In other words, if there is no code running in other threads, even with a lot of concurrent tasks, things like this:
value = global_list[0]
new_value = (complicated expression using value)
global_list[0] = new_value
are concurrency-safe in async code.
And on top of that, asyncio queues are built for consistency in async contexts. They'd never break and get into an inconsistent state if two concurrent tasks try to put
, get
or use the no_wait
variants of those in code running in the same thread. (Although if you need to put data in another thread to be consumed in an asynchronous task, that is another thing and will require a carefully developed pattern to work)
Just go for it - and keep in mind that unless you want to yield to the async loop at the point you are doing a put
, or you are really concerned about constraining the queue size, you can simply to put_nowait
(without the await
) - that will even prevent other tasks of running "near" your put.