I am new to async programming, I am trying to understand how TaskGroup
can be used with asyncio.Queue
. I have the below module with test, but when executing pytest
, it prints out the queue item, but then it just hangs / deadlocks? Any suggestion on what I am doing wrong?
Module: AsynchronousQueueBeta.py
from asyncio import Queue, TaskGroup
class AsynchronousQueueBeta:
"""Asynchronous Queue Beta"""
async def fetch_recursive(self, source_list: list[str], maximum_connection: int = 10):
"""Fetch Recursive"""
print('Fetch Recursive')
query_queue = Queue()
for source in source_list:
query_queue.put_nowait(source)
async with TaskGroup() as group:
task_list = [
group.create_task(self.fetch_query(query_queue)) for _ in range(maximum_connection)
]
await query_queue.join()
result_list = [task.result() for task in task_list]
print(f'Result List: {result_list}')
async def fetch_query(self, queue: Queue):
"""Fetch Query"""
while True:
query = await queue.get()
print(f'Query: {query}')
queue.task_done()
Test: TestAsynchronousQueueBeta.py
import pytest
from AsynchronousQueueBeta import AsynchronousQueueBeta
class TestAsynchronousQueueBeta():
"""Test Asynchronous Queue Beta"""
@pytest.mark.asyncio
@pytest.mark.parametrize(
'source_list', [
[
'https://httpbin.org/anything/1',
'https://httpbin.org/anything/2',
'https://httpbin.org/anything/3',
'https://httpbin.org/anything/4',
'https://httpbin.org/anything/5',
'https://httpbin.org/anything/6',
'https://httpbin.org/anything/7',
'https://httpbin.org/anything/8',
'https://httpbin.org/anything/9',
'https://httpbin.org/anything/10',
'https://httpbin.org/anything/11',
'https://httpbin.org/anything/12',
],
]
)
async def test_fetch_recursive(self, source_list: list[str]):
"""Test Fetch Recursive"""
beta = AsynchronousQueueBeta()
await beta.fetch_recursive(
source_list=source_list,
)
Result
platform darwin -- Python 3.12.1, pytest-7.4.4, pluggy-1.3.0 -- /Users/abc/Desktop/Project/Workspace/Python/pv312/bin/python3.12
cachedir: .pytest_cache
rootdir: /Users/abc/Desktop/Project/Async
configfile: pytest.ini
plugins: asyncio-0.23.3, anyio-4.2.0
asyncio: mode=Mode.STRICT
collected 1 item
Test/TestAsynchronousQueueBeta.py::TestAsynchronousQueueBeta::test_fetch_recursive[source_list0] Fetch Recursive
Query: https://httpbin.org/anything/1
Query: https://httpbin.org/anything/2
Query: https://httpbin.org/anything/3
Query: https://httpbin.org/anything/4
Query: https://httpbin.org/anything/5
Query: https://httpbin.org/anything/6
Query: https://httpbin.org/anything/7
Query: https://httpbin.org/anything/8
Query: https://httpbin.org/anything/9
Query: https://httpbin.org/anything/10
Query: https://httpbin.org/anything/11
Query: https://httpbin.org/anything/12
^C
!!! KeyboardInterrupt !!!
/opt/python/3.12.1/lib/python3.12/selectors.py:566: KeyboardInterrupt
(to show a full traceback on KeyboardInterrupt use --full-trace)
...
I think I figured it out (in a sense), I put a signal, or sentinel value to tell when queue is done, here's an example, I also have a full example with a test
async def fetch_recursive(
self,
source_list: list[str],
maximum_task: int = 10,
):
print('Fetch Recursive')
query_queue = Queue()
result_queue = Queue()
async with TaskGroup() as group:
task_list = [
group.create_task(
self.fetch_query(
name=f'Worker-{index + 1}',
query_queue=query_queue,
result_queue=result_queue,
)
) for index in range(maximum_task)
]
for source in source_list:
await query_queue.put(source)
for _ in range(maximum_task):
await query_queue.put(None)
result_list = []
while not result_queue.empty():
result = await result_queue.get()
result_list.append(result)
# Display the result(s)
for result in result_list:
print(f'Result: {result}')
async def fetch_query(
self,
name: str,
query_queue: Queue,
result_queue: Queue,
):
while True:
query = await query_queue.get()
if query is None:
break
print(f'{name} Fetch Query: {query}')
await asyncio.sleep(1)
result = f'{name} Result: {query}'
await result_queue.put(result)