djangocelerydjango-celery-beat

monitoring celery task in Django


I am using celery-results and celery-beats in my Django app. I am using React framework in my frontend and I'd like to create a component to monitor all statuses in all celery tasks but celery-beats displays only statuses: STARTED, FAILURE, SUCCESS. What I want is to display all possible statuses in my database. I am using Flower and it is working fine but I need just to save all statuses to my PostgreSQL db.

Question

What would be the best way to do it? I know that TaskResults is just storing completed tasks but how to extend this library to store all statuses to my db? I configured celery to show me STARTED status but it is not enough. I need all possible states, especially RECEIVED.


Solution

  • Here is how you can monitor celery task executions in Django and store their status in the database.

    First you need an appropriate model to store the executions of tasks. Let's call them task logs:

    class TaskLog(models.Model)
        ...
    

    You can then use celery signals (different from Django signals) to run code whenever a new task is executed or reaches a certain state and the update your database accordingly.

    For example you can use the signals task_successful, task_failure and task_retry to register the completion of a task execution with their respective end states as new TaskLog object.

    from celery import signals
    from .models import TaskLog
    
    
    @signals.task_success.connect
    def task_success_handler(sender=None, result=None, **kw):
        TaskLog.objects.create_from_success_signal(sender=sender, result=result)
    
    @signals.task_failure.connect
    def task_failure_handler(sender=None, task_id=None, exception=None, **kw):
        TaskLog.objects.create_from_failure_signal(sender, task_id, exception)
    
    @signals.task_retry.connect
    def task_retry_handler(sender=None, request=None, reason=None, **kw):
        TaskLog.objects.create_from_retry_signal(sender=sender, request=request, reason=reason)
    

    There are additional signals to cover other states (e.g. task_internal_error) or collect additional data, e.g. measure task duration.