I'm using Celery and RabbitMQ with this DLQ setup. (The workers run legacy workers against an old web service.)
Furthermore, I'm using the onfailure_reject
decorator as per this suggestion.
Raising a Reject
works, and my workload ends up in the DLQ, and I can see the payload in Redis after the retries are exhausted:
{
"status": "RETRY",
"result": {
"exc_type": "LegacyWebRetriableError",
"exc_message": [
"(myapp-retriable-failure) Operation failed.",
{
"status": "failure",
"message": "(myapp-retriable-failure) Operation failed.",
"exit_code": 17
}
],
"exc_module": "myapp_workers.legacy_exec_web"
},
"traceback": "Traceback (most recent call last):\n File \"/app/myapp_workers/celery_app.py\", line 126, in legacy_web\n result = run_legacy_web(probability)\n File \"/app/myapp_workers/legacy_exec_web.py\", line 95, in run_legacy_web\n raise LegacyWebRetriableError(result.message, result)\nmyapp_workers.legacy_exec_web.LegacyWebRetriableError: ('(myapp-retriable-failure) Operation failed.', LegacyWebResult(status=failure,exit_code=17))\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"/opt/.venv/lib/python3.13/site-packages/celery/app/trace.py\", line 453, in trace_task\n R = retval = fun(*args, **kwargs)\n ~~~^^^^^^^^^^^^^^^^^\n File \"/opt/.venv/lib/python3.13/site-packages/celery/app/trace.py\", line 736, in __protected_call__\n return self.run(*args, **kwargs)\n ~~~~~~~~^^^^^^^^^^^^^^^^^\n File \"/app/myapp_workers/celery_app.py\", line 107, in _wrapper\n return f(self, *args, **kwargs)\n File \"/app/myapp_workers/celery_app.py\", line 134, in legacy_web\n self.retry(exc=e, countdown=2**self.request.retries)\n ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/opt/.venv/lib/python3.13/site-packages/celery/app/task.py\", line 764, in retry\n raise ret\ncelery.exceptions.Retry: Retry in 4s: LegacyWebRetriableError('(myapp-retriable-failure) Operation failed.', LegacyWebResult(status=failure,exit_code=17))\n",
"children": [
[
[
"2d62f0c0-fef6-4bc4-94be-3d484126788f",
null
],
null
]
],
"date_done": null,
"parent_id": "2d62f0c0-fef6-4bc4-94be-3d484126788f",
"task_id": "2d62f0c0-fef6-4bc4-94be-3d484126788f"
}
However, the caller that fetches the worker's result is still hitting a timeout, as it understandably waits for a result that will never come.
result = legacy_web.delay().get(timeout=TIMEOUT)
In the decorator, I'm trying to update the state to force an error:
def onfailure_reject(requeue=False):
def _decorator(f):
@wraps(f)
def _wrapper(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except TaskPredicate:
raise # Do not handle TaskPredicate like Retry or Reject
except Exception as e:
self.update_state(state=states.FAILURE) # <-- this line is new
raise Reject(str(e), requeue=requeue)
return _wrapper
return _decorator
This gets past the timeout issue, but now introduces two more issues:
RuntimeError: No active exception to reraise
meta
, or if setting the FAILURE
state is the right thing to do.{
"status": "FAILURE",
"result": null, // <-- missing the exception (which contains my legacy result)
"traceback": null, // <-- missing traceback
"children": [],
"date_done": "2025-04-24T14:07:49.002092+00:00",
"parent_id": "cc6ff956-e151-4e09-ac22-c0092e240fd9",
"task_id": "cc6ff956-e151-4e09-ac22-c0092e240fd9"
}
I managed to preserve the result
and traceback
by doing the following in the decorator (similar to what the Celery code does internally):
def onfailure_reject(requeue=False):
def _decorator(f):
@wraps(f)
def _wrapper(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except TaskPredicate:
raise # Do not handle TaskPredicate like Retry or Reject
except Exception as e:
meta = self.backend.prepare_exception(e)
tb = "".join(traceback.format_exception(e))
self.update_state(state=states.FAILURE, meta=meta, traceback=tb)
raise Reject(str(e), requeue=requeue)
return _wrapper
return _decorator
And an extra bonus is that the status
is now FAILURE
instead of RETRY
, which is more apt for a dead letter (I guess?)