pythoncronpython-asyncioschedulerrq

Scheduling periodic RQ tasks, using asyncio


I think that the architecture of RQ Scheduler is fundamentally flawed and it's much more complicated than it needs to be.

Schedules are stored in Redis
Even if you remove or modify your scheduling code, the old schedules remain in Redis until explicitly canceled.

Non-declarative model
Simply deleting the scheduler.cron(...) line does not remove the schedule; you must manually run scheduler.cancel(job_id).

rqscheduler doesn’t run your code
The rqscheduler command only polls Redis for due jobs. It does not import or execute your Python code to dynamically update schedules.

No built-in reconciliation
You must handle the lifecycle of scheduled jobs—adding, updating, or removing them—on your own, often requiring extra scripts or manual processes.

I strongly prefer Celery Beat's approach of dispatching / enqueuing tasks, from my python code, exactly when they are scheduled to run.

However, in this particular project, I'm using RQ and thus, Celery Beat is unavailable to me.

How can I create something simple that takes cron strings and works in similar way to Celery Beat?


Solution

  • # scheduler.py
    
    from __future__ import annotations
    
    import asyncio
    from collections.abc import Callable
    from datetime import datetime
    from typing import Any, TypedDict, Unpack
    
    from croniter import croniter
    import logfire
    import rq
    
    
    queue = rq.Queue(connection=...)
    _schedulers: list[Callable] = []
    
    
    def run_rq_scheduler():
        """
        This should only be called once, by a single instance, of a single service.
        """
        for s in _schedulers:
            s()
    
    
    class ScheduleRqJobKwargs(TypedDict, total=False):
        """
        See description of `rq.Queue.enqueue_call` for detail or documentation:
        https://python-rq.org/docs/#enqueueing-jobs
        """
    
        timeout: int | None
        result_ttl: int | None
        ttl: int | None
        failure_ttl: int | None
        description: str | None
        job_id: str | None
        at_front: bool
        meta: dict | None
        retry: rq.Retry | None
        on_success: rq.Callback | Callable[..., Any] | None
        on_failure: rq.Callback | Callable[..., Any] | None
        on_stopped: rq.Callback | Callable[..., Any] | None
    
    
    def schedule_rq_job(cron: str, **kwargs: Unpack[ScheduleRqJobKwargs]):
        def inner(fn: Callable):
            def scheduler():
                fn_name = f'{fn.__module__}.{fn.__qualname__}'
                logfire.info(f'Scheduling RQ Job: {cron} | {fn_name}')
    
                async def coro():
                    while True:
                        now = datetime.now()
                        next_: datetime = croniter(cron).get_next(datetime, now, False)
                        delta = next_ - now
                        await asyncio.sleep(delta.total_seconds())
    
                        logfire.info(f'Enqueuing RQ Job: {cron} | {fn_name}')
                        queue.enqueue(fn_name, **kwargs)
    
                asyncio.create_task(coro())  # noqa: RUF006
    
            _schedulers.append(scheduler)
    
            return fn
    
        return inner
    
    
    # -----------------------------------------------
    # All jobs must be scheduled here, in this file.
    # -----------------------------------------------
    
    
    # Examples: 
    @schedule_rq_job('0 4 * * *')
    async def full_sync():
        ...
    
    
    @schedule_rq_job('@hourly')
    async def process_restock_subscriptions():
        ...
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        run_rq_scheduler()
        loop.run_forever()