pythoncelerypriority-queue

celery priority does not apply


i have this example code:

@router.post('/test')
def Test():
    for i in range(0,10):
        Testy.apply_async([i],priority=7)
    return "done"

@app.task(bind=True)
def Testy(self,i):
    try:
        testtest(i)
        return "Fine"     
    except Exception as e:
        raise self.retry(countdown=0, priority=2)
def testtest(i):
    try:
        print(i)
        time.sleep(2)
        if random.randint(1,5) == 3:
            raise ZeroDivisionError
    except:
        print("ERROR")
        raise

in this code i randomly raised an exception. my normal task priority is 7 but my retried task priority is 2 (which is higher duo to celery documentation) so i expect that retried task run sooner than remaining task

i expect this output:

0
1
2
3
ERROR
3
4
5
6
7
ERROR
7
8
9

but i got this:

1
2
3
ERROR
4
5
6
7
ERROR
8
9
3
7

that means that retried task go to end of queue even with higher priority

        priority = Testy.request.delivery_info.get('priority')
        print(priority)

and it print 7 and 2 for me correctly

Testy.apply_async([i],priority=2)
MAX_TASK_PRIORITY = 10
DEFAULT_TASK_PRIORITY = 7
worker_prefetch_multiplier = 1
task_acks_late = True

i use celery==5.3.0 and rabbitmq==3.12.7


Solution

  • First issue was misinterpretation of how priorities were set. According to the Celery documentation, tasks with lower priority numbers are given higher priority, leading to tasks with priority 2 being executed sooner than others. However, even after correcting this misunderstanding, the problem persisted. Consequently, I decided to switch the Celery backend from RabbitMQ to Redis, which ultimately resolved the issue.

    I'm still uncertain why RabbitMQ failed to prioritize tasks correctly, but Redis is working fine and correctly.