pythonmultithreadingfastapibackground-taskuvicorn

FastAPI python: How to run a thread in the background?


I'm making a server in python using FastAPI, and I want a function that is not related to my API, to run in background every 5 minutes (like checking stuff from an API and printing stuff depending on the response)

I've tried to make a thread that runs the function start_worker, but it doesn't print anything.

Does anyone know how to do so ?

def start_worker():
    print('[main]: starting worker...')
    my_worker = worker.Worker()
    my_worker.working_loop() # this function prints "hello" every 5 seconds

if __name__ == '__main__':
    print('[main]: starting...')
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()

Solution

  • Option 1

    You should start your Thread before calling uvicorn.run, as uvicorn.run is blocking the thread.

    import time
    import threading
    from fastapi import FastAPI
    import uvicorn
    
    app = FastAPI()
    class BackgroundTasks(threading.Thread):
        def run(self,*args,**kwargs):
            while True:
                print('Hello')
                time.sleep(5)
      
    if __name__ == '__main__':
        t = BackgroundTasks()
        t.start()
        uvicorn.run(app, host="0.0.0.0", port=8000)
    

    You could also start your thread using FastAPI's startup event, as long as it is ok to run before the application starts.

    @app.on_event("startup")
    async def startup_event():
        t = BackgroundTasks()
        t.start()
    

    Option 2

    You could instead use a repeating Event scheduler for the background task, as below:

    import sched, time
    from threading import Thread
    from fastapi import FastAPI
    import uvicorn
    
    app = FastAPI()
    s = sched.scheduler(time.time, time.sleep)
    
    def print_event(sc): 
        print("Hello")
        sc.enter(5, 1, print_event, (sc,))
    
    def start_scheduler():
        s.enter(5, 1, print_event, (s,))
        s.run()
    
    @app.on_event("startup")
    async def startup_event():
        thread = Thread(target=start_scheduler)
        thread.start()
    
    if __name__ == '__main__':
        uvicorn.run(app, host="0.0.0.0", port=8000)
    

    Option 3

    If your task is an async def function (see this answer for more details on def vs async def endpoints/background tasks in FastAPI), then you could add the task to the current event loop, using the asyncio.create_task() function. The create_task() function takes a coroutine object (i.e., an async def function) and returns a Task object (which can be used to await the task, if needed, or cancel it , etc). The call creates the task inside the event loop for the current thread, and executes it in the "background" concurrently with all other tasks in the event loop, switching between them at await points.

    It is required to have an event loop created before calling create_task(), and this is already created when starting the uvicorn server either programmatically (using, for instance, uvicorn.run(app)) or in the terminal (using, for instance, uvicorn app:app). Instead of using asyncio.create_task(), one could also use asyncio.get_running_loop() to get the current event loop, and then call loop.create_task().

    The example below uses the recently documented way for adding lifespan events (using a context manager), i.e., code that should be executed before the application starts up, as well as when the application is shutting down (see the documentation, as well as this answer and this answer for more details and examples). One could also still use the startup and shutdown events, as demonstrated in the previous options; however, those event handlers might be removed from future FastAPI/Starlette versions.

    from fastapi import FastAPI
    from contextlib import asynccontextmanager
    import asyncio
    
    
    async def print_task(s): 
        while True:
            print('Hello')
            await asyncio.sleep(s)
    
            
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        # Run at startup
        asyncio.create_task(print_task(5))
        yield
        # Run on shutdown (if required)
        print('Shutting down...')
    
    
    app = FastAPI(lifespan=lifespan)