I am trying to generate a chord from a list of signatures.
The length of this list is not known when the chain starts to execute.
Eg. The output of task_c
is a list of lenth n
, that list should generate a list of the same length of celery signatures to be executed in parallel. I can only access the result of the previous task with a partial signature .s()
, so how can I dynamically define this group of signatures inside my chain?
Eg. how do I go from this:
@task
def task_d(kwargs):
return [task_e.si(i) for i in random.sample(range(10, 30), 5)]
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
chord(
[
task_e.si(**kwargs),
task_e.si(**kwargs),
task_e.si(**kwargs),
],
my_callback.si(**kwargs),
),
),
...to this
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
chord(
[task_d.s(**kwargs)], <<<<<< ?
my_callback.si(**kwargs),
),
),
I think you have to do it like this:
@task
def task_d(list_of_length_n):
tasks = [task_e.si(i) for i in list_of_length_n]
chord(tasks, my_callback.si(**kwargs))()
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
task_d.s(),
)