I'm trying to use nested chord's in Celery, but can't get it to work.
The use-case I have is a for first run a single task, the output of the then feeds into a group of multiple tasks, the output of that group is then intended to feed into another single task.
To debug I started with a minimal application inspired by this old issue: https://github.com/celery/celery/issues/4161
My test code is
#!/usr/bin/env python
from celery import Celery
app = Celery('canvastest', backend='redis://', broker='redis://')
@app.task
def i(x):
return x
to run it I do:
celery -A canvastest shell
celery -A canvastest worker
docker run -p 6379:6379 redis
Inside the interactive shell I can the reproduce the example from the issue linked above as follows
Python 3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> chord([i.s(1), i.s(2)])(group(i.s(), i.s())).get(timeout=5)
[[1, 2], [1, 2]]
>>>
transforming that to match the first half of what I'm trying to do also works
>>> chord(i.s(1))(group(i.s(), i.s())).get(timeout=5)
[[1], [1]]
>>>
now trying to expand that so the output is sent into a single task at the end is where it falls appart
>>> chord(chord(i.s(1))(group(i.s(), i.s())))(i.s()).get(timeout=5)
Traceback (most recent call last):
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/bin/shell.py", line 71, in _invoke_default_shell
import IPython # noqa
ModuleNotFoundError: No module named 'IPython'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/bin/shell.py", line 74, in _invoke_default_shell
import bpython # noqa
ModuleNotFoundError: No module named 'bpython'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1377, in __call__
return self.apply_async((), {'body': body} if body else {}, **options)
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1442, in apply_async
return self.run(tasks, body, args, task_id=task_id, **merged_options)
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1506, in run
header_result_args = header._freeze_group_tasks(group_id=group_id, chord=body, root_id=root_id)
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1257, in _freeze_group_tasks
results = list(self._freeze_unroll(
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1299, in _freeze_unroll
yield task.freeze(group_id=group_id,
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 304, in freeze
return self.AsyncResult(tid)
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/kombu/utils/objects.py", line 30, in __get__
return super().__get__(instance, owner)
File "/usr/lib/python3.10/functools.py", line 981, in __get__
val = self.func(instance)
File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 471, in AsyncResult
return self.type.AsyncResult
AttributeError: 'AsyncResult' object has no attribute 'AsyncResult'
Am I just writing the code in the wrong way, or am I hitting some bug/limitation where it's not possible to feed the output from a chord into another chord?
Using celery 5.2.7 and redis 7.0.7
Four months later I found the answer to my problem. Calling it as
chord(header)(callback)
directly invokes it, if it is to be part of a chain the syntax changes to chord(header, callback)