Schedules are stored in Redis
Even if you remove or modify your scheduling code, the old schedules remain in Redis until explicitly canceled.
Non-declarative model
Simply deleting the scheduler.cron(...)
line does not remove the schedule; you must manually run scheduler.cancel(job_id)
.
rqscheduler
doesn’t run your code
The rqscheduler
command only polls Redis for due jobs. It does not import or execute your Python code to dynamically update schedules.
No built-in reconciliation
You must handle the lifecycle of scheduled jobs—adding, updating, or removing them—on your own, often requiring extra scripts or manual processes.
However, in this particular project, I'm using RQ and thus, Celery Beat is unavailable to me.
How can I create something simple that takes cron strings and works in similar way to Celery Beat?
# scheduler.py
from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import datetime
from typing import Any, TypedDict, Unpack
from croniter import croniter
import logfire
import rq
queue = rq.Queue(connection=...)
_schedulers: list[Callable] = []
def run_rq_scheduler():
"""
This should only be called once, by a single instance, of a single service.
"""
for s in _schedulers:
s()
class ScheduleRqJobKwargs(TypedDict, total=False):
"""
See description of `rq.Queue.enqueue_call` for detail or documentation:
https://python-rq.org/docs/#enqueueing-jobs
"""
timeout: int | None
result_ttl: int | None
ttl: int | None
failure_ttl: int | None
description: str | None
job_id: str | None
at_front: bool
meta: dict | None
retry: rq.Retry | None
on_success: rq.Callback | Callable[..., Any] | None
on_failure: rq.Callback | Callable[..., Any] | None
on_stopped: rq.Callback | Callable[..., Any] | None
def schedule_rq_job(cron: str, **kwargs: Unpack[ScheduleRqJobKwargs]):
def inner(fn: Callable):
def scheduler():
fn_name = f'{fn.__module__}.{fn.__qualname__}'
logfire.info(f'Scheduling RQ Job: {cron} | {fn_name}')
async def coro():
while True:
now = datetime.now()
next_: datetime = croniter(cron).get_next(datetime, now, False)
delta = next_ - now
await asyncio.sleep(delta.total_seconds())
logfire.info(f'Enqueuing RQ Job: {cron} | {fn_name}')
queue.enqueue(fn_name, **kwargs)
asyncio.create_task(coro()) # noqa: RUF006
_schedulers.append(scheduler)
return fn
return inner
# -----------------------------------------------
# All jobs must be scheduled here, in this file.
# -----------------------------------------------
# Examples:
@schedule_rq_job('0 4 * * *')
async def full_sync():
...
@schedule_rq_job('@hourly')
async def process_restock_subscriptions():
...
if __name__ == '__main__':
loop = asyncio.get_event_loop()
run_rq_scheduler()
loop.run_forever()