pythonmultithreadingasync-awaitmqttfastapi

How to capture MQTT packages in background with FastAPI and Paho-MQTT?


I'm building an API with using FastAPI and Paho-MQTT. The purpose of this API is to query data from the database and send to an Android app while is receiving data from some devices, in background, through MQTT, and saving it to MongoDB. The problem is the following: I can't recieve data from MQTT in background because it's stopping the API startup. I'd like to use the API routes normally while the background task continuously receives and saves the MQTT data.

Tried Approaches:

  1. I've tried to use asyncio.create_task, but this blocks the complete startup of the API, so I can't use the routes.
  2. I've read that run.uvicorn stops other threads, so I should start the capturing before it.

The implementation of both aproaches are in app.py, in startup function and in name=='__main__' respectively.

I have a app.py file, that is the main file and I also have a mqtt.py to handle MQTT incoming data.

The app.py is like the following:

@app.on_event("startup")
async def startup_event():
    asyncio.create_task(mqtt_subscribe()) # Stops the startup flow when I try to run using create_task

# Some routes here...
@app.route("/")
async def root():
    return {"hello": "world"}


if __name__ == "__main__":
    mqtt_subscribe() # When I try running before uvicorn.run, doesn't start the subscription
    uvicorn.run(app, host="0.0.0.0", port=8000)

The mqtt.py is like the following:

client = motor.motor_asyncio.AsyncIOMotorClient(URL)
db = client.collection

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # Set Connecting Client ID
    client = mqtt_client.Client(CLIENT_ID)
    # Set CA certificate
    client.tls_set(ca_certs=ROOT_CA_PATH)
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.connect(BROKER, PORT)
    return client

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        # Perform necessary operations with the received data
        payload = msg.payload.decode('utf-8')
        data = json.loads(payload)
        result = db["devices"].insert_one(data)
        print("Document inserted! ID:", result.inserted_id)

    client.subscribe(TOPIC, qos=0)
    client.on_message = on_message

async def mqtt_subscribe():
    # Set up the MQTT client
    print("function subscribe")
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()

Output when I run with create_task(mqtt_subscribe) in startup function

INFO:     Started server process [1126]
INFO:     Waiting for application startup.
function subscribe
Connected to MQTT Broker!

As you can see, INFO: Startup completed doesn't appear, so I can't use the API routes.

Thanks in advance for your help!


Solution

  • You can run the MQTT loop in a background thread. Rather than relying on the startup event I would probably do something like this:

    import asyncio
    import json
    from threading import Thread
    from fastapi import FastAPI
    import paho.mqtt.client as mqtt
    
    latest_mqtt_value = None
    
    def on_message(client, userdata, message):
        global latest_mqtt_value
        payload = json.loads(message.payload)
        latest_mqtt_value = payload['value']
    
    def create_app():
        app = FastAPI()
        client = mqtt.Client()
        client.connect('localhost', 1883)
        client.subscribe('topic')
        client.on_message = on_message
        client.loop_start()
    
        @app.get("/")
        async def root():
            return {"value": latest_mqtt_value}
    
        return app
    
    app = create_app()
    

    This is a runnable example; if you put it in a file example.py and run it like this:

    uvicorn example:app
    

    Then post a message to topic with the value {"value": 123}:

    mosquitto_pub -t topic -m '{"value": 123}'
    

    A subsequent fetch of http://localhost:8000 will return:

    {"value": 123}
    

    Etc.