I have code which uses Python requests to kick off a task which runs in a worker that is started with rq. (Actually, the GET request results in one task which itself starts a second task. But this complexity shouldn't affect things, so I've left that out of the code below.) I already have a test which uses rq's SimpleWorker class to cause the code to run synchronously. This works fine. But now I'm adding requests_ratelimiter to the second task, and I want to be sure it's behaving correctly. I think I need to somehow mock the time.sleep()
function used by the rate limiter, and I can't figure out how to patch it.
routes.py
@app.route("/do_work/", methods=["POST"])
def do_work():
rq_job = my_queue.enqueue(f"my_app.worker.do_work", job_timeout=3600, *args, **kwargs)
worker.py
from requests_ratelimiter import LimiterSession
@job('my_queue', connection=redis_conn, timeout=3600, result_ttl=24 * 60 * 60)
def do_work():
session = LimiterSession(per_second=1)
r = session.get(WORK_URL)
test.py
import requests_mock
def test_get(client):
# call the Flask function to kick off the task
client.get("/do_work/")
with requests_mock.Mocker() as m:
# mock the return value of the requests.get() call in the worker
response_success = {"result": "All good"}
m.get(WORK_URL, json=response_success)
worker = SimpleWorker([my_queue], connection=redis_conn)
worker.work(burst=True) # Work until the queue is empty
A test in requests_ratelimiter
patches the sleep function using a target path of 'pyrate_limiter.limit_context_decorator.sleep'
, but that doesn't work for me because I'm not declaring pyrate_limiter
at all. I've tried mocking the time
function and then passing that into the LimiterSession, and that sort of works:
worker.py
from requests_ratelimiter import LimiterSession
from time import time
@job('my_queue', connection=redis_conn, timeout=3600, result_ttl=24 * 60 * 60)
def do_work():
session = LimiterSession(per_second=1, time_function=time)
r = session.get(WORK_URL)
test.py
import requests_mock
def test_get(client):
# call the Flask function to kick off the task
client.get("/do_work/")
with patch("my_app.worker.time", return_value=None) as mock_time:
with requests_mock.Mocker() as m:
response_success = {"result": "All good"}
m.get(URL, json=response_success)
worker = SimpleWorker([my_queue], connection=redis_conn)
worker.work(burst=True) # Work until the queue is empty
assert mock_time.call_count == 1
However, then I see time
called many more times than sleep
would be, so I don't get the info I need from it. And patching my_app.worker.time.sleep
results in the error:
AttributeError: does not have the attribute 'sleep'
I have also tried patching the pyrate_limiter
as the requests_ratelimiter
testing code does:
with patch(
"my_app.worker.requests_ratelimiter.pyrate_limiter.limit_context_decorator.sleep", return_value=None
) as mock_sleep:
But this fails with:
ModuleNotFoundError: No module named 'my_app.worker.requests_ratelimiter'; 'my_app.worker' is not a package
How can I test and make sure the rate limiter is engaging properly?
The solution was indeed to use 'pyrate_limiter.limit_context_decorator.sleep'
, despite the fact that I wasn't importing it.
When I did that and made the mock return None
, I discovered that sleep()
was being called tens of thousands of times because it's in a while
loop.
So in the end, I also needed to use freezegun
and a side effect on my mock_sleep
to get the behavior I wanted. Now time is frozen, but sleep()
jumps the test clock forward synchronously and instantly by the amount of seconds passed as an argument.
from datetime import timedelta
from unittest.mock import patch
import requests_mock
from freezegun import freeze_time
from rq import SimpleWorker
def test_get(client):
with patch("pyrate_limiter.limit_context_decorator.sleep") as mock_sleep:
with freeze_time() as frozen_time:
# Make sleep operate on the frozen time
# See: https://github.com/spulec/freezegun/issues/47#issuecomment-324442679
mock_sleep.side_effect = lambda seconds: frozen_time.tick(timedelta(seconds=seconds))
with requests_mock.Mocker() as m:
m.get(URL, json=response_success)
worker = SimpleWorker([my_queue], connection=redis_conn)
worker.work(burst=True) # Work until the queue is empty
# The worker will do enough to get rate limited once
assert mock_sleep.call_count == 1