My real world situation is that I want to get a list of campaigns from an api call and for each campaign trigger a chain of functions. Once all the chains are complete I need to call a function to report on the results.
I've tried to simplify this down as much as possible and have the following code. This runs, but the chord unlock function is called before the chains are complete. In this code it means that it can't sum the array of results.
import time
from celery import Celery, chain, chord, group
app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
@app.task
def generate():
return [1, 2, 3, 4, 5]
@app.task
def dmap(it, first, second):
chains = []
for arg in it:
c = chain(first.clone([arg, ]), second)
chains.append(c)
return group(chains)()
@app.task
def add(x, y):
print 'add {x} {y}'.format(x=x, y=y)
time.sleep(3)
return x + y
@app.task
def mul(x, y):
print 'mul {x} {y}'.format(x=x, y=y)
time.sleep(2)
return x * y
@app.task
def xsum(numbers):
print numbers
to_sum = []
for x in numbers[0]:
to_sum.append(x.result)
print to_sum
return sum(to_sum)
if __name__ == '__main__':
x = add.s(0)
y = mul.s(1)
workers = generate.si() | dmap.s(x, y)
result = chord(workers)(xsum.s())
print result.get()
The dmap
function was based on this answer. I've also seen this answer. The last link implies that what I want to do might not be possible since "there is nothing to synchronize with as the group happens in parallel."
I couldn't work out how to bend the solution to work when the generate
function returns an array rather than an single item.
The log from running the above shows the (early?) chord unlock and hence xsum
attempting to sum over an array of results where 3 are None
.
[2014-11-11 14:03:10,308: INFO/MainProcess] Received task: tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51]
[2014-11-11 14:03:10,311: INFO/MainProcess] Received task: celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] eta:[2014-11-11 14:03:11.307477+00:00]
[2014-11-11 14:03:10,338: INFO/MainProcess] Received task: tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18]
[2014-11-11 14:03:10,365: INFO/MainProcess] Task tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51] succeeded in 0.0523488249746s: [1, 2, 3, 4, 5]
[2014-11-11 14:03:10,386: INFO/MainProcess] Received task: tasks.add[eccf5faa-069c-4634-826e-af5793a11c68]
[2014-11-11 14:03:10,388: WARNING/Worker-2] add 1 0
[2014-11-11 14:03:10,390: INFO/MainProcess] Received task: tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961]
[2014-11-11 14:03:10,392: WARNING/Worker-1] add 2 0
[2014-11-11 14:03:10,394: INFO/MainProcess] Received task: tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52]
[2014-11-11 14:03:10,397: INFO/MainProcess] Received task: tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c]
[2014-11-11 14:03:10,398: INFO/MainProcess] Received task: tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d]
[2014-11-11 14:03:10,399: WARNING/Worker-4] add 3 0
[2014-11-11 14:03:10,401: INFO/MainProcess] Task tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18] succeeded in 0.061700456019s: <GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195,...
[2014-11-11 14:03:10,402: WARNING/Worker-3] add 4 0
[2014-11-11 14:03:13,409: INFO/MainProcess] Received task: tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344]
[2014-11-11 14:03:13,410: INFO/MainProcess] Received task: tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b]
[2014-11-11 14:03:13,418: INFO/MainProcess] Received task: tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d]
[2014-11-11 14:03:13,419: INFO/MainProcess] Received task: tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195]
[2014-11-11 14:03:13,436: INFO/MainProcess] Task tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52] succeeded in 3.03667491797s: 3
[2014-11-11 14:03:13,437: INFO/MainProcess] Task tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c] succeeded in 3.03460178198s: 4
[2014-11-11 14:03:13,438: INFO/MainProcess] Task tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961] succeeded in 3.04608612298s: 2
[2014-11-11 14:03:13,439: WARNING/Worker-4] mul 4 1
[2014-11-11 14:03:13,450: WARNING/Worker-2] add 5 0
[2014-11-11 14:03:13,452: INFO/MainProcess] Task tasks.add[eccf5faa-069c-4634-826e-af5793a11c68] succeeded in 3.06420573901s: 1
[2014-11-11 14:03:13,454: WARNING/Worker-3] mul 3 1
[2014-11-11 14:03:13,481: INFO/MainProcess] Task celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] succeeded in 0.0413383140112s: None
[2014-11-11 14:03:13,485: INFO/MainProcess] Received task: tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373]
[2014-11-11 14:03:15,470: INFO/MainProcess] Task tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344] succeeded in 2.031282346s: 4
[2014-11-11 14:03:15,472: WARNING/Worker-1] mul 1 1
[2014-11-11 14:03:15,477: INFO/MainProcess] Task tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b] succeeded in 2.02354899806s: 3
[2014-11-11 14:03:15,479: WARNING/Worker-4] [<GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195, 4ffb6d04-0cf2-4300-a0de-bf53acf6662d, 538c3c60-67f8-409d-b4ce-bf09184aa03b, f696aa0a-844f-4e81-9722-0693c6e8c344, 82a6b814-53a5-45f1-a0dc-43885f92eca4]>]
[2014-11-11 14:03:15,555: WARNING/Worker-4] [None, None, 3, 4, None]
[2014-11-11 14:03:15,564: ERROR/MainProcess] Task tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373] raised unexpected: TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'",)
Traceback (most recent call last):
File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
return self.run(*args, **kwargs)
File "/home/duncan/projects/celerychordtest/tasks.py", line 47, in xsum
return sum(to_sum)
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
[2014-11-11 14:03:16,460: INFO/MainProcess] Received task: tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4]
[2014-11-11 14:03:16,462: WARNING/Worker-3] mul 5 1
[2014-11-11 14:03:16,476: WARNING/Worker-2] mul 2 1
[2014-11-11 14:03:16,476: INFO/MainProcess] Task tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d] succeeded in 3.02716274199s: 5
[2014-11-11 14:03:17,480: INFO/MainProcess] Task tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195] succeeded in 2.00813938997s: 1
[2014-11-11 14:03:18,485: INFO/MainProcess] Task tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d] succeeded in 2.00837794197s: 2
[2014-11-11 14:03:18,471: INFO/MainProcess] Task tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4] succeeded in 2.009012155s: 5
I was hoping / expecting the process to wait until each chain chains is complete before the chord unlock is called.
Like @ChillarAnand suggested I ended up redesigning my tasks, however I did so to eliminate the need for a chord. I wanted the ability to have a group of chains, which meant I couldn't (as far as I could work out) combine this with a chord.
What I now do is to trigger the "final" task as part of triggering the group of chains. In order to make this work the final task has to check that the other tasks have completed. Since I know my last task (in my real world program) writes to a database I can simply check that I have a row in the database for each item that was generated.
For anyone facing a similar problem, the relevant parts of the final function look roughly like the following:
class NotReady(Exception):
pass
@shared_task(default_retry_delay=30, max_retries=10)
def output(generated_list):
list_from_db = query db ...
try:
raise_if_not_equal(list_from_db, generated_list)
except NotReady, exc:
raise current.retry(exc=exc, countdown=30)
... everything is ready do stuff ...
FWIW: I'll probably update the retry to backoff basing the code roughly on the following thread
This feels like a good answer, and crucially because this task throws an exception I never have a worker sat polling to find out if everything has completed.