i have this example code:
@router.post('/test')
def Test():
for i in range(0,10):
Testy.apply_async([i],priority=7)
return "done"
@app.task(bind=True)
def Testy(self,i):
try:
testtest(i)
return "Fine"
except Exception as e:
raise self.retry(countdown=0, priority=2)
def testtest(i):
try:
print(i)
time.sleep(2)
if random.randint(1,5) == 3:
raise ZeroDivisionError
except:
print("ERROR")
raise
in this code i randomly raised an exception. my normal task priority is 7 but my retried task priority is 2 (which is higher duo to celery documentation) so i expect that retried task run sooner than remaining task
i expect this output:
0
1
2
3
ERROR
3
4
5
6
7
ERROR
7
8
9
but i got this:
1
2
3
ERROR
4
5
6
7
ERROR
8
9
3
7
that means that retried task go to end of queue even with higher priority
priority = Testy.request.delivery_info.get('priority')
print(priority)
and it print 7 and 2 for me correctly
Testy.apply_async([i],priority=2)
i tried using another task (with same queue) with higher priority so in the except, it go to than task and run sooner that other. it didn't worked too
i tried this setting:
MAX_TASK_PRIORITY = 10
DEFAULT_TASK_PRIORITY = 7
worker_prefetch_multiplier = 1
task_acks_late = True
i use celery==5.3.0
and rabbitmq==3.12.7
First issue was misinterpretation of how priorities were set. According to the Celery documentation, tasks with lower priority numbers are given higher priority, leading to tasks with priority 2 being executed sooner than others. However, even after correcting this misunderstanding, the problem persisted. Consequently, I decided to switch the Celery backend from RabbitMQ to Redis, which ultimately resolved the issue.
I'm still uncertain why RabbitMQ failed to prioritize tasks correctly, but Redis is working fine and correctly.