djangocelerydjango-celerycelery-taskcelerybeat

How to specify celery beat periodic task?


I want to make a periodic task with Django and Celery. I have configured the celery in my project.

The project structure looks like this:

project
├── apps
│   ├── laws
│        └──tasks
│           └──periodic.py # the task is in this file
├── config
│   ├── celery.py
│   ├── settings
│      └── base.py # CELERY_BEAT_SCHEDULE defined in this file

base.py:

CELERY_BEAT_SCHEDULE = {
    "sample_task": {
        "task": "apps.laws.tasks.periodic.SampleTask", # the problem is in the line
        "schedule": crontab(minute="*/1"),
    },
}

periodic.py:

class SampleTask(Task):
    name="laws.sample_task"

    def run(self, operation, *args, **kwargs):
        logger.info("The sample task in running...")

Here is the error I get:

 The delivery info for this task is:
 {'exchange': '', 'routing_key': 'celery'}
 Traceback (most recent call last):
   File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
     strategy = strategies[type_]
 KeyError: 'apps.laws.tasks.periodic.SampleTask'

How can I define the route of the task correctly?


Solution

  • I solved it. instead of the route, just put the name of the task in settings:

    CELERY_BEAT_SCHEDULE = {
        "laws.sample_task": {
            "task": "laws.sample_task", # put the name here.
            "schedule": crontab(minute="*/1"),
        },
    }