python-3.xpython-asynciofastapipython-telegram-botevent-loop

How to run another application within the same running event loop?


I want my FastAPI app to have access to always actual bot_data of python-telegram-bot. I need that so when i call some endpoint in FastAPI could, for example, send messages to all chats, stored somewere in bot_data.

As i understand the problem: bot.run_polling() and uvicorn.run(...) launch two independent async loops. And i need to run them in one.

UPD-1:
Thanks to @MatsLindh I created next function which i pass to main block, but it works inconsistent. Some times bot.run_polling() (gets correct loop and everything works, but other times and breaks with error that there are different loops):

import asyncio
from uvicorn import Config, Server
# --snip--
def run(app: FastAPI, bot:Application):
    # using get_event_loop leads to:
    # RuntimeError: Cannot close a running event loop
    # I guess it is because bot.run_polling()
    # calls loop.run_until_complete() different tasks
    # loop = asyncio.get_event_loop()
    loop = asyncio.new_event_loop()
    server = Server(Config(app=app, port=9001))
    loop.create_task(server.serve())

    t = Thread(target=loop.run_forever)
    t.start()

    bot.run_polling()

    t.join()
# --snip--
if __name__ == "__main__":
# --snip--
    run(f_app, bot_app)

Also i know I could decompose bot.run_polling() into several separate calls that are agregated inside, but I am sure it should work with just that shortuct funcion.

Initial

My simplified setup looks like below.

Initially I tried to run not with threads but with multiprocessing.Proccess, however in that way my bot_data was always empty - i assumed it is because bot data not shared between processes so whole thing must be in one process. And here I am failing in to run all these stuff in one async loop.

# main.py
# python3.10
# pip install fastapi[all] python-telegram-bot
from threading import Thread

import uvicorn
from telegram.ext import Application, ApplicationBuilder, PicklePersistence
from fastapi import FastAPI, Request

BOT_TOKEN = "telegram-bot-token"
MY_CHAT = 123456

class MyApp(FastAPI):
    def add_bot(self, bot_app: Application):
        self.bot_app = bot_app

async def post_init(app: Application):
    app.bot_data["key"] = 42

f_app = MyApp()

@f_app.get("/")
async def test(request: Request):
   app: MyApp = request.app
   bot_app: Application = app.bot_app
   val = bot_app.bot_data.get('key')
   print(f"{val=}")
   await bot_app.bot.send_message(MY_CHAT, f"Should be 42: {val}")


if __name__ == "__main__":
    pers = PicklePersistence("storage")
    bot_app = ApplicationBuilder().token(BOT_TOKEN).post_init(post_init).persistence(pers).build()
    f_app.add_bot(bot_app)

    t1 = Thread(target=uvicorn.run, args=(f_app,), kwargs={"port": 9001})
    t1.start()

    # --- Launching polling in main thread causes
    # telegram.error.NetworkError: Unknown error in HTTP implementation:
    # RuntimeError('<asyncio.locks.Event object at 0x7f2764e6fd00 [unset]> is bound to a different event loop')
    # message is sent and value is correct, BUT app breaks and return 500
    # bot_app.run_polling()

    # --- Launching polling in separate thread causes
    # RuntimeError: There is no current event loop in thread 'Thread-2 (run_polling)'.
    # t2 = Thread(target=bot_app.run_polling)
    # t2.start()

    # --- Launching with asyncio causes:
    # ValueError: a coroutine was expected, got <bound method Application.run_polling ...
    # import asyncio
    # t2 = Thread(target=asyncio.run, args=(bot_app.run_polling,))
    # t2.start()

    t1.join()
   

Solution

  • When calling uvicorn.run(), a new event loop is created (internally, asyncio.run() is being called—see the linked source code). When attemping to lunch another application after starting the uvicorn server (and hence, the FastAPI app)—or, vice versa—that also creates a new event loop, such as your Telegram bot app, that line of code to start the other application will not going to be reached, until the already running event loop is exited. This is because running an event loop is blokcing, meaning that it will block the calling thread until the event loop is terminated.

    If you also attempted running the other application (essentially, an event loop) within an app that is already using an event loop, or attempted calling asyncio.run() or there is more than one call to loop.run_until_complete() within the app, you would come across errors such as:

    > RuntimeError: Cannot run the event loop while another loop is running
    > RuntimeError: asyncio.run() cannot be called from a running event loop
    > RuntimeError: This event loop is already running
    

    There are a few ways to solve this. For demontration purposes, the solutions given below use a simple printing app as the second application that also creates an event loop. This app is as follows:

    printing_app.py

    import asyncio
    
    async def go():
        counter = 0
        while True:
            counter += 1
            print(counter)
            await asyncio.sleep(1)
    
           
    def run():
        asyncio.run(go())
    

    Solution 1

    You can use uvicorn.Server.serve() to run uvicorn from an already running async environment (see also the implementation of Config class for all the available parameters, i.e., host, port, etc.). First, use asyncio.new_event_loop() to create a new event loop and then set it as the current event loop for the current thread, using asyncio.set_event_loop(). Next, schedule the execution of the other asynchronous app, by using loop.create_task() and passing a coroutine to it (i.e., a coroutine object is the result of calling an async def function), not the method that executes the asyncio.run() function. In printing_app.py above, that is the go() function. The coroutine that is wrapped in the task may not run immediately. It is scheduled and will run as soon as the event loop finds an opportunity to execute the task—as described in this answer, this may happen when the currently-running coroutine reaches an await expression, as well as an async for or async with block, as these operations use await under the hood.

    Finally, use loop.run_until_complete() to run the uvicorn server, by passing the uvicorn.Server.serve() coroutine—if the argument passed to loop.run_until_complete() is a coroutine, it is wrapped in a Task (see the relevant implementation, as well as the documentation link above); hence, there is no need for one calling loop.create_task() on the coroutine this time. It will execute the provided task and block until it is completed.

    In the interest of clarity, asyncio.new_event_loop(), followed by asyncio.set_event_loop() and loop.run_until_complete() is what actually happens behind the scenes when using asyncio.run()—see the latest Python's Runner class implementation, as well as the implementation of the run() method in Python 3.10 (which might be more clear).

    P.S. One could alternatively create every task using create_task() and finally call loop.run_forever(), which will run the event loop forever, until it is explicitly stopped by calling its stop() method. On the other hand, loop.run_until_complete() will keep running until the task you passed to it is completed and the result is returned (or when an exception is raised). Depending on one's needs, as well as the nature of tasks that they have to execute, may choose between the two.

    Example 1

    from fastapi import FastAPI
    import printing_app
    import asyncio
    import uvicorn
    
    app = FastAPI()
    
    
    @app.get('/')
    def main():
        return 'Hello World!'
        
    
    def start_uvicorn(loop):
        config = uvicorn.Config(app, loop=loop)
        server = uvicorn.Server(config)
        loop.run_until_complete(server.serve())
        
    
    def start_printing_app(loop):
        loop.create_task(printing_app.go())  # pass go() (coroutine), not run() 
    
                
    if __name__ == '__main__':
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        start_printing_app(loop)
        start_uvicorn(loop)
    

    Example 2

    Since this is a FastAPI application, you could run the server as usual (using uvicorn.run(app)), and utilize FastAPI's/Starlette's Lifespan events to execute the second app at startup. To execute it, you can use asyncio.create_task(), which will wrap the coroutine into a task, as explained earlier, and schedule its execution. The task will be executed in the loop returned by asyncio.get_running_loop(), which returns the event loop in the current thread. Alternatively, you could call asyncio.get_running_loop() yourself to get the running event loop, and then use the create_task() function, as mentioned earlier, to execute the task.

    from fastapi import FastAPI
    from contextlib import asynccontextmanager
    import asyncio
    import printing_app
    import uvicorn
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        asyncio.create_task(printing_app.go())
        # Alternatively:
        #loop = asyncio.get_running_loop()
        #loop.create_task(printing_app.go())
        yield
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    @app.get('/')
    def main():
        return 'Hello World!'
        
    
    if __name__ == '__main__':
        uvicorn.run(app)
    

    Example 3

    Another variation would be to use asyncio.run() to create an async environment to run the app, then call asyncio.create_task() to start the other application, and finally, use await server.serve() to start the uvicorn server—any further code after that last part would be executed once the uvicorn server has finished running or forced to exit (e.g., when pressing CTRL + C).

    from fastapi import FastAPI
    import asyncio
    import printing_app
    import uvicorn
    
    app = FastAPI()
    
    
    @app.get('/')
    def main():
        return 'Hello World!'
    
        
    async def main():
        # start printing app
        asyncio.create_task(printing_app.go())
        
        # start uvicorn server
        config = uvicorn.Config(app)
        server = uvicorn.Server(config)
        await server.serve()
     
     
    if __name__ == '__main__':
        asyncio.run(main())
    

    Solution 2

    Another solution would be to use nest_asyncio, as demonstrated here, which allows running multiple asyncio event loops in nested environments. However, it is generally recommended to avoid using nested event loops, as it could lead to unexpected behavior.


    Running Telegram Bot app within FastAPI app

    As mentioned in this comment on github by a maintainer of the relevant library, using Application.run_polling() is purely optional and would block the event loop until the user sends a stop signal; that is what makes run_polling() unsuitable when combined with ASGI frameworks, such as FastAPI. In that case, you can just manually call the methods that run_polling() actually runs behind the scenes. An example showing how to run uvicorn server on Starlette application, along with a telegram-bot application, can be seen here. Based on that example and all the information provided earlier, the following solutions are provided.

    Example 1

    from fastapi import FastAPI
    import asyncio
    import uvicorn
    
    app = FastAPI()
    
    
    @app.get('/')
    def main():
        return 'Hello World!'
        
    
    async def main():
        config = uvicorn.Config(app, host='0.0.0.0', port=8000)
        server = uvicorn.Server(config)
        
        application = .... # initialize your telegram-bot app
        
        # Run application and webserver together
        async with application:
            await application.start()
            await server.serve()
            await application.stop()
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    Example 2

    from fastapi import FastAPI
    from contextlib import asynccontextmanager
    import uvicorn
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        application = .... # initialize your telegram-bot app
        await application.start()
        yield
        await application.stop()
    
      
    app = FastAPI(lifespan=lifespan)
    
    
    @app.get('/')
    def main():
        return 'Hello World!'
    
        
    if __name__ == '__main__':
        uvicorn.run(app)