I'm trying to convert existing celery group call into a chord to prevent deadlocks. The previous code had retries and an expiration time. I managed to get the chord working without those settings but when I try to apply the setting I don't see the tasks being run. I didn't see anything in the documentation about applying the same settings on the chord as a whole. I'm running celery version 3.1.6.
Previous code:
jobs = group([reset_device.s(topoid, dev_list[i],
waittime_list[i], skipflag) for i in range(len(dev_list))]
).apply_async(expires=waittime, retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0.5,
'interval_step': 0.2,
'interval_max': 0.2})
results = jobs.join_native(timeout=waittime + 600, propagate=True)
Working chord (without settings):
jobs = chord([reset_device.s(topoid, dev_list[i],
waittime_list[i], skipflag) for i in range(len(dev_list))])(callback)
Non-working chord #1:
jobs = chord([reset_device.s(topoid, dev_list[i], waittime_list[i],
skipflag).set(expires=datetime.now() + timedelta(seconds=waittime)).set(retry=True).set(retry_policy=retry_policy)
for i in range(len(dev_list))])(callback)
Non-working chord #2
jobs = chord([reset_device.subtask(args=(topoid, dev_list[i], waittime_list[i],skipflag),
expires=datetime.now()+timedelta(seconds=waittime), retry=True, retry_policy=retry_policy)
for i in range(len(dev_list))])(callback)
In both #1 and #2 cases the tasks in the chord don't appear to get run. How can I apply expire time and retry for each of the tasks called in the chord?
I figured it out and it was a mix of problems.
The first problem was that the expires field did not accept an integer, only datetime objects, within a chord(possibly groups and chains too) despite the documentation not making any distinction. This had been fixed at a later version, I tested with 3.1.25 and was able to verify the fix.
The second problem is that celery 3.1.6 does not log errors within chords(I think groups and chains too). This also had been fixed, I tested in 3.1.25 and was able to see the failure.
The third problem was related to the error message:
[2017-08-07 18:39:56,043: ERROR/Worker-5] Chord '98246849-0d5d-4be2-85e3-3fc08e90011d' raised: TaskRevokedError(u'expired',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 90, in unlock_chord
ret = j(timeout=3.0, propagate=propagate)
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 691, in join_native
raise value
TaskRevokedError: expired
This was because the timezone was not right. I had used datetime.now()
instead of datetime.utcnow()
, which resolves the issue and works in 3.1.6.
Alternatively I could have set the celery config CELERY_ENABLE_UTC = False
, which is set to True by default. This was confusing me because we had set the config CELERY_TIMEZONE
to the local time. The expires field, when used with a datetime object, uses either the local time or UTC depending on the value of the CELERY_ENABLE_UTC
setting. I recommend keeping the two config settings the same.
Interestingly enough the callback function is created and polls to see if the chord is done despite the the chord tasks never executing and it just stays there forever. I believe this may have been fixed in celery 4.1.