asynchronouspytestpython-asynciopython-unittestpytest-asyncio

pytest-async synronization objects not working, tests finish, but async loop not waited on. It worked on unittest IsolatedAsyncioTestCase


I have the following code (I redacted it and made it a bit simpler).

class AbstractRateLimiter(ABC):
    @abstractmethod
    async def acquire(self, *args, **kwargs):
        pass

    @abstractmethod
    async def release(self):
        pass


class RateLimiterB(AbstractRateLimiter):
    def __init__(self, rpm: int = 0):
        # from aiolimiter import AsyncLimiter
        self.limiter = AsyncLimiter(rpm) if rpm > 0 else None

    async def acquire(self, *args, **kwargs):
        if self.limiter:
            await self.limiter.acquire()

    async def release(self):
        pass


class RateLimiterA(AbstractRateLimiter):
       
    def __init__(self, concurrency_limit: int = 0):
        self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit > 0 else None

    async def acquire(self, *args, **kwargs):
        if self._semaphore:
            await self._semaphore.acquire()

    async def release(self):
        if self._semaphore:
            self._semaphore.release()

and the following code that uses these limiters:

async def execute_async(self, body: dict) -> ChatAdapterResponse:
    """
    Returns:
        ChatAdapterResponse: a ChatAdapterResponse from the model
    """        
    rate_limiter = RateLimiterA(1) # RateLimterA(1) / RateLimterB(2) both do not work, try any one of the two.
    if rate_limiter:
            await rate_limiter.acquire()
    try:
        # async send, is wrapper around requests
        response = await async_send_request(
            "SomeUrl", body, self._get_headers()
        )
        return response
    finally:
        if rate_limiter:
            await rate_limiter.release()

When the code was using unittest it works fine, and waits properly:

class TestRateLimiterA():
  def test_rate_limitA():
     for i in range(self.TEST_COUNT):
                start_time = asyncio.get_running_loop().time()
                res = await execute_async(
                    SOME_DICT
                )
                end_time = asyncio.get_running_loop().time()
                print(
                    f"Finished requests for {model} in {end_time - start_time} seconds"
                )
                # assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
                if i > 0:
                    self.assertTrue(end_time - start_time > 60)
                )

def test_rate_limiter_B():
    start_time = asyncio.get_running_loop().time()
    tasks = [
        async_execute_on_model(
            body
        )
        for body in bodies # array of 4 dicts
    ]
    results = await asyncio.gather(*tasks)
    end_time = asyncio.get_running_loop().time()
    print(f"Finished requests in {end_time - start_time} seconds")
    self.assertTrue(
        end_time - start_time > 5 * (len(bodys) - 1)
    )

However when I tried to switch to using pytest, it seems the async loop and waits are not longer waiting, and the following code executes very quickly and does not wait or respect the synchronization objects. Any idea why ?

async def test_rate_limit_A(mock_send_request, 1rpm_rate_limiterA):

      for i in range(TEST_COUNT):
          start_time = asyncio.get_running_loop().time()
          res = await async_execute_on_model(
              SomeDict
          )
            end_time = asyncio.get_running_loop().time()
            print(
                f"Finished requests for {dict} in {end_time - start_time} seconds"
            )
            # assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
            if i > 0:
                assert end_time - start_time > 60  

async def test_rate_limitB(mock_async_requestB, 1rpm_rate_limiterB):
    
    start_time = asyncio.get_running_loop().time()
    tasks = [
        async_execute_on_model(
            body
        )
        for body in bodies
    ]
    results = await asyncio.gather(*tasks)
    end_time = asyncio.get_running_loop().time()
    print(f"Finished requests in {end_time - start_time} seconds")
    
    assert end_time - start_time > 5 * (len(bodies) - 1)
    for res in results:

Why is this happening, and what am I doing wrong with pytest-async?


Solution

  • You're instantiating a new rate limiter for each request in execute_async. Each limiter will allow its single request to be executed immediately.

    You need to make sure the same limiter instance is used for all requests.