celery

calling update_state before raising Reject


I'm using Celery and RabbitMQ with this DLQ setup. (The workers run legacy workers against an old web service.)

Furthermore, I'm using the onfailure_reject decorator as per this suggestion.

Raising a Reject works, and my workload ends up in the DLQ, and I can see the payload in Redis after the retries are exhausted:

{
    "status": "RETRY",
    "result": {
        "exc_type": "LegacyWebRetriableError",
        "exc_message": [
            "(myapp-retriable-failure) Operation failed.",
            {
                "status": "failure",
                "message": "(myapp-retriable-failure) Operation failed.",
                "exit_code": 17
            }
        ],
        "exc_module": "myapp_workers.legacy_exec_web"
    },
    "traceback": "Traceback (most recent call last):\n  File \"/app/myapp_workers/celery_app.py\", line 126, in legacy_web\n    result = run_legacy_web(probability)\n  File \"/app/myapp_workers/legacy_exec_web.py\", line 95, in run_legacy_web\n    raise LegacyWebRetriableError(result.message, result)\nmyapp_workers.legacy_exec_web.LegacyWebRetriableError: ('(myapp-retriable-failure) Operation failed.', LegacyWebResult(status=failure,exit_code=17))\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/opt/.venv/lib/python3.13/site-packages/celery/app/trace.py\", line 453, in trace_task\n    R = retval = fun(*args, **kwargs)\n                 ~~~^^^^^^^^^^^^^^^^^\n  File \"/opt/.venv/lib/python3.13/site-packages/celery/app/trace.py\", line 736, in __protected_call__\n    return self.run(*args, **kwargs)\n           ~~~~~~~~^^^^^^^^^^^^^^^^^\n  File \"/app/myapp_workers/celery_app.py\", line 107, in _wrapper\n    return f(self, *args, **kwargs)\n  File \"/app/myapp_workers/celery_app.py\", line 134, in legacy_web\n    self.retry(exc=e, countdown=2**self.request.retries)\n    ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/opt/.venv/lib/python3.13/site-packages/celery/app/task.py\", line 764, in retry\n    raise ret\ncelery.exceptions.Retry: Retry in 4s: LegacyWebRetriableError('(myapp-retriable-failure) Operation failed.', LegacyWebResult(status=failure,exit_code=17))\n",
    "children": [
        [
            [
                "2d62f0c0-fef6-4bc4-94be-3d484126788f",
                null
            ],
            null
        ]
    ],
    "date_done": null,
    "parent_id": "2d62f0c0-fef6-4bc4-94be-3d484126788f",
    "task_id": "2d62f0c0-fef6-4bc4-94be-3d484126788f"
}

However, the caller that fetches the worker's result is still hitting a timeout, as it understandably waits for a result that will never come.

result = legacy_web.delay().get(timeout=TIMEOUT)

In the decorator, I'm trying to update the state to force an error:

def onfailure_reject(requeue=False):
    def _decorator(f):
        @wraps(f)
        def _wrapper(self, *args, **kwargs):
            try:
                return f(self, *args, **kwargs)
            except TaskPredicate:
                raise  # Do not handle TaskPredicate like Retry or Reject
            except Exception as e:
                self.update_state(state=states.FAILURE)  # <-- this line is new
                raise Reject(str(e), requeue=requeue)

        return _wrapper

    return _decorator

This gets past the timeout issue, but now introduces two more issues:

{
    "status": "FAILURE",
    "result": null,   // <-- missing the exception (which contains my legacy result)
    "traceback": null, // <-- missing traceback
    "children": [],
    "date_done": "2025-04-24T14:07:49.002092+00:00",
    "parent_id": "cc6ff956-e151-4e09-ac22-c0092e240fd9",
    "task_id": "cc6ff956-e151-4e09-ac22-c0092e240fd9"
}

Solution

  • I managed to preserve the result and traceback by doing the following in the decorator (similar to what the Celery code does internally):

    def onfailure_reject(requeue=False):
        def _decorator(f):
            @wraps(f)
            def _wrapper(self, *args, **kwargs):
                try:
                    return f(self, *args, **kwargs)
                except TaskPredicate:
                    raise  # Do not handle TaskPredicate like Retry or Reject
                except Exception as e:
                    meta = self.backend.prepare_exception(e)
                    tb = "".join(traceback.format_exception(e))
                    self.update_state(state=states.FAILURE, meta=meta, traceback=tb)
                    raise Reject(str(e), requeue=requeue)
    
            return _wrapper
    
        return _decorator
    

    And an extra bonus is that the status is now FAILURE instead of RETRY, which is more apt for a dead letter (I guess?)