I wrote the following factory fixture that creates a ReconnectingConsumer
and then awaits it to stop:
@pytest.fixture
def make_consumer():
async def _make_consumer():
consumer = ReconnectingConsumer(
amqp_url=AMQP_URL, queue=INPUT_QUEUE, on_message=None
)
yield consumer
await consumer.stop()
print('Consumer stopped')
return _make_consumer
So I am using it to create a new consumer, running it and then cancelling it to stop:
@pytest.mark.asyncio
async def test_pending_todo_set(make_consumer, make_publisher, make_dispatcher_service):
make_consumer_gen = make_consumer()
consumer_1: ReconnectingConsumer = await anext(make_consumer_gen)
tasks: List[asyncio.Task] = []
tasks.append(asyncio.create_task(consumer_1.run()))
for t in tasks:
t.cancel()
_ = await asyncio.gather(*tasks, return_exceptions=True)
However, no matter all the tasks
are awaited, I still get the error:
2024-05-22 12:30:13,673 | ERROR | base_events:1819 | Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<<async_generator_athrow without __name__>()>>
The reason it happens is that you didn't execute aiter
to get async iterator
from function. So it could be like this:
@pytest.mark.asyncio
async def test_pending_todo_set(make_consumer, make_publisher, make_dispatcher_service):
make_consumer_gen = make_consumer()
async_iterator = aiter(make_consumer_gen())
consumer_1: ReconnectingConsumer = await anext(async_iterator)
But the problem here is that you need to call anext
again to go back to yield
, and there will be StopIteration
exception.
Another point is to use async for
which wiil handle aiter
and anext
as well as StopIteration
, but also it possible to use context manager like this and it should work:
from contextlib import asynccontextmanager
@pytest.fixture
def make_consumer():
@asynccontextmanager
async def _make_consumer():
consumer = ReconnectingConsumer(
amqp_url=AMQP_URL, queue=INPUT_QUEUE, on_message=None
)
yield consumer
await consumer.stop()
print('Consumer stopped')
return _make_consumer
@pytest.mark.asyncio
async def test_pending_todo_set(make_consumer, make_publisher, make_dispatcher_service):
make_consumer_gen = make_consumer()
async with make_consumer_gen() as consumer_1:
tasks: List[asyncio.Task] = []
tasks.append(asyncio.create_task(consumer_1.run()))
for t in tasks:
t.cancel()
_ = await asyncio.gather(*tasks, return_exceptions=True)
One more option is to use @pytest_asyncio.fixture
from pytest_asyncio
and yield desiarable value from there in dummy code it could be:
@pytest_asyncio.fixture(scope='session')
async def get_my_value() -> AsyncIterator[SomeInstance]:
async with SomeInstance() as value:
# code before
yield value
# code after