pythonpython-requestsmockingpython-rq

Mocking code run inside an rq SimpleWorker


I have code which uses Python requests to kick off a task which runs in a worker that is started with rq. (Actually, the GET request results in one task which itself starts a second task. But this complexity shouldn't affect things, so I've left that out of the code below.) I already have a test which uses rq's SimpleWorker class to cause the code to run synchronously. This works fine. But now I'm adding requests_ratelimiter to the second task, and I want to be sure it's behaving correctly. I think I need to somehow mock the time.sleep() function used by the rate limiter, and I can't figure out how to patch it.

routes.py

@app.route("/do_work/", methods=["POST"])
def do_work():
    rq_job = my_queue.enqueue(f"my_app.worker.do_work", job_timeout=3600, *args, **kwargs)

worker.py

from requests_ratelimiter import LimiterSession

@job('my_queue', connection=redis_conn, timeout=3600, result_ttl=24 * 60 * 60)
def do_work():
    session = LimiterSession(per_second=1)
    r = session.get(WORK_URL)

test.py

import requests_mock

def test_get(client):
    # call the Flask function to kick off the task
    client.get("/do_work/")

    with requests_mock.Mocker() as m:
        # mock the return value of the requests.get() call in the worker
        response_success = {"result": "All good"}
        m.get(WORK_URL, json=response_success)
        
        worker = SimpleWorker([my_queue], connection=redis_conn)
        worker.work(burst=True)  # Work until the queue is empty

A test in requests_ratelimiter patches the sleep function using a target path of 'pyrate_limiter.limit_context_decorator.sleep', but that doesn't work for me because I'm not declaring pyrate_limiter at all. I've tried mocking the time function and then passing that into the LimiterSession, and that sort of works:

worker.py

from requests_ratelimiter import LimiterSession
from time import time

@job('my_queue', connection=redis_conn, timeout=3600, result_ttl=24 * 60 * 60)
def do_work():
    session = LimiterSession(per_second=1, time_function=time)
    r = session.get(WORK_URL)

test.py

import requests_mock

def test_get(client):
    # call the Flask function to kick off the task
    client.get("/do_work/")

    with patch("my_app.worker.time", return_value=None) as mock_time:
        with requests_mock.Mocker() as m:
            response_success = {"result": "All good"}
            m.get(URL, json=response_success)
        
            worker = SimpleWorker([my_queue], connection=redis_conn)
            worker.work(burst=True)  # Work until the queue is empty
        
        assert mock_time.call_count == 1

However, then I see time called many more times than sleep would be, so I don't get the info I need from it. And patching my_app.worker.time.sleep results in the error:

AttributeError: does not have the attribute 'sleep'

I have also tried patching the pyrate_limiter as the requests_ratelimiter testing code does:

    with patch(
        "my_app.worker.requests_ratelimiter.pyrate_limiter.limit_context_decorator.sleep", return_value=None
    ) as mock_sleep:

But this fails with:

ModuleNotFoundError: No module named 'my_app.worker.requests_ratelimiter'; 'my_app.worker' is not a package

How can I test and make sure the rate limiter is engaging properly?


Solution

  • The solution was indeed to use 'pyrate_limiter.limit_context_decorator.sleep', despite the fact that I wasn't importing it.

    When I did that and made the mock return None, I discovered that sleep() was being called tens of thousands of times because it's in a while loop.

    So in the end, I also needed to use freezegun and a side effect on my mock_sleep to get the behavior I wanted. Now time is frozen, but sleep() jumps the test clock forward synchronously and instantly by the amount of seconds passed as an argument.

    from datetime import timedelta
    from unittest.mock import patch
    
    import requests_mock
    from freezegun import freeze_time
    from rq import SimpleWorker
    
    
    def test_get(client):
        with patch("pyrate_limiter.limit_context_decorator.sleep") as mock_sleep:
            with freeze_time() as frozen_time:
                # Make sleep operate on the frozen time
                # See: https://github.com/spulec/freezegun/issues/47#issuecomment-324442679
                mock_sleep.side_effect = lambda seconds: frozen_time.tick(timedelta(seconds=seconds))
    
                with requests_mock.Mocker() as m:
                    m.get(URL, json=response_success)
                    worker = SimpleWorker([my_queue], connection=redis_conn)
                    worker.work(burst=True)  # Work until the queue is empty
    
                # The worker will do enough to get rate limited once
                assert mock_sleep.call_count == 1