I'm a bit new in celery configs.
I have a task named myapp.tasks.my_task
for example.
I can see myapp.tasks.my_task
in registered tasks of celery when I use celery inspect registered
. doesn't it mean that the task is successfully registered? why it raises the following error for it:
KeyError celery.worker.consumer.consumer in on_task_received
Received unregistered task of type 'my_app.tasks.my_task'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
there are also other tasks in my_app.tasks
and they work correctly but only this task does not work and gets KeyError:
@shared_task(queue='celery')
def other_task():
""" WORKS """
...
@shared_task(queue='celery')
def my_task():
""" DOES NOT WORK """
...
Back in the day, when I faced the problem a senior solved it for me in a mushroom management way unfortunately (see more about this anti-pattern here). I came back to this problem recently to figure out the solution in our own project domain.
As Niel pointed in his/her solution, we were using celery_app.autodiscover_tasks()
in our project and in that case we should import my_task
in __init__.py
of tasks
package like bellow.
from .some_tasks_file import my_task
Also, we used celery beat and the task defined inside app.conf.beat_schedule
must have exact path to the function like bellow (even though the function is imported in __init__.py
of the tasks
package).
app.conf.beat_schedule = {
'MY_TASK': {
'task': 'myapp.tasks.some_tasks_file.my_task',
'schedule': 60, # every minute
},
}
Hope this would help people with the same celery configuration and problem.