pythonasynchronouscelerychainchord

Celery chain with dynamically-defined chord


I am trying to generate a chord from a list of signatures. The length of this list is not known when the chain starts to execute. Eg. The output of task_c is a list of lenth n, that list should generate a list of the same length of celery signatures to be executed in parallel. I can only access the result of the previous task with a partial signature .s(), so how can I dynamically define this group of signatures inside my chain?

Eg. how do I go from this:

@task
def task_d(kwargs):
    return [task_e.si(i) for i in random.sample(range(10, 30), 5)]

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    chord(
            [
                task_e.si(**kwargs),
                task_e.si(**kwargs),
                task_e.si(**kwargs),
            ],
            my_callback.si(**kwargs),
        ),
    ),

...to this

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    chord(
            [task_d.s(**kwargs)],     <<<<<< ?
            my_callback.si(**kwargs),
        ),
    ),

Solution

  • I think you have to do it like this:

    @task
    def task_d(list_of_length_n):
        tasks = [task_e.si(i) for i in list_of_length_n]
        chord(tasks, my_callback.si(**kwargs))()
    
    ctask = chain(
        task_a.si(**kwargs),
        task_b.si(**kwargs),
        task_c.si(**kwargs),
        task_d.s(),
    )