I'm using FastAPI BackgroundTasks and trying to get it to execute tasks in background in a given order.
For example, if I request my endpoint that creates Tasks with the following order request1 - request2 - request3
, I am expecting them to be executed in the same order.
Unfortunately, the tasks are executed in the following order request1 - request3 - request2
.
So I feel like it takes the first one first (because there is no task yet), and then queue request 2 & 3. But it'll then execute request 3 before 2 as in Last In First Out.
And if, in the meanwhile you send more requests, they'll take first place (execution order would be request 5 - request4 - request 3 - request2
).
I'd like to make backgroundTasks FIFO execution order rather then LIFO. Any clue on that? Couldn,'t find anything on FastAPI nor Starlette documentation.
I created a simple bit of code to try it and reproduce the bug. As I tried it to make sure everything was okay, I realised the behavior is not the same on differents environments. The following is working normally (FIFO) on my Windows 10 (Python 3.8.2 and FastAPI 0.98) but doesn't work normally (LIFO) on CentOS (Python 3.9.12 and FastAPI 0.96).
App.py file :
from uuid import UUID
from pydantic import BaseModel
from typing import List, Dict
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
from http import HTTPStatus
import time
from fastapi import FastAPI, HTTPException, BackgroundTasks
class Job(BaseModel):
uid: UUID
status: str = 'in_progress'
# API init
app = FastAPI()
jobs: Dict[UUID, Job] = {}
def mock_function(id_job) -> str:
jobs[id_job].status = 'in_progress'
time.sleep(5)
def process_request(job_id):
response = mock_function(job_id)
jobs[job_id].status = 'complete'
@app.on_event("startup")
def startup():
# Define the number of background tasks job executed simultaneously.
RunVar("_default_thread_limiter").set(CapacityLimiter(1))
@app.get("/status")
async def status_handler() :
return jobs
@app.post('/request/{uid}', status_code=HTTPStatus.ACCEPTED)
async def request_API(uid:UUID, background_tasks: BackgroundTasks):
new_task = Job(uid=uid)
new_task.status = 'in_queue'
jobs[new_task.uid] = new_task
background_tasks.add_task(process_request,
new_task.uid)
return new_task
Test file :
import requests
import time
query_tasks = {}
jobs_id = ['c880cc1b-dc27-4175-b616-29a69322d156', '4f0ea1f1-a5a3-4a7c-a114-17e57a1c55db','15ba0d0a-3c94-4906-a331-054a3847171c']
for i in range(3):
r = requests.post(f'http://127.0.0.1:8000/request/{jobs_id[i]}')
id_ = r.json()['uid']
query_tasks[id_] = i+1
query_tasks
for t in range(20):
r = requests.get('http://127.0.0.1:8000/status')
tasks = r.json()
for k,v in tasks.items():
print(query_tasks[k])
print(v['status'])
time.sleep(1)
print('-----------------------------------------------')
Output :
1
complete
2
in_queue
3
in_progress
-----------------------------------------------
1
complete
2
in_progress
3
complete
(Some outputs have been removed for clarity)
Any lead ?
Thank you
After few tests, I changed the version of the anyio lib from 3.5.0 to anything greater than 3.6.0 (in my case I used 3.7.0) and it worked !