pythonwhile-loopfastapizeromqpyzmq

How to listen to a TCP stream on my FastAPI server


I am working on a project building an API that is able to send the live location of vehicles to a frontend.

I get this location data by subscribing to a ZMQ stream by running a while loop. This is all working and if I just run my stream as a script I can print all kinds of information to the terminal (I'll store those in a database later on).

I also have the FastAPI server up and running

Now what I'd like to do is:

What happens instead:

Here is my code:

# General FastAPI Imports
from fastapi import Depends, FastAPI, Request
from data_collection.livestream import enable_data_stream
from client_service import client_api

app = FastAPI()

app.include_router(client_api.router, prefix="/API/V1")
@app.get('/')
def read_root(request: Request):
    return {"Hello": "World"}

The Stream:


from gzip import GzipFile
from io import BytesIO
import zmq
import xml.etree.ElementTree as ET




context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://SERVER")
subscriber.setsockopt(zmq.SUBSCRIBE)

while True:
    multipart = subscriber.recv_multipart()
    address = multipart[0]
    try:
        contents = GzipFile('', 'r', 0, BytesIO(multipart[1])).read()
        root = ET.fromstring(contents)
        print("Updates Received:")
        # Gets the timestamp
        print('time', root[3].text)
        print('X Coord: ', root[4][0][12].text)
        print('Y Coord: ', root[4][0][13].text)

I tried looking into the multiprocess and threading implementations for python but I'm unsure how those tie in with starting the FastAPI process (as that's enabled from Uvicorn)


Solution

  • In the example below, the server and worker are started in separate processes because the While loop won't resolve. It seems that you were on the right track. In my example, I have these functions in one file, but there are no restrictions on someone breaking them out into their own files:

    import uvicorn
    import multiprocessing
    import time
    import zmq
    import xml.etree.ElementTree as ET
    from gzip import GzipFile
    from io import BytesIO
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    async def root():
        return {"message": "Hello World"}
    
    
    def server():
        uvicorn.run(app, host="localhost", port=8000)
    
    
    def worker():
        context = zmq.Context()
        subscriber = context.socket(zmq.SUB)
        subscriber.connect("tcp://SERVER")
        subscriber.setsockopt(zmq.SUBSCRIBE)
    
        while True:
            multipart = subscriber.recv_multipart()
            address = multipart[0]
    
            try:
                contents = GzipFile('', 'r', 0, BytesIO(multipart[1])).read()
                root = ET.fromstring(contents)
                print("Updates Received:")
                # Gets the timestamp
                print('time', root[3].text)
                print('X Coord: ', root[4][0][12].text)
                print('Y Coord: ', root[4][0][13].text)
    
            except Exception as e:
                print(e)
                print("Error: %s" % multipart[1])
                break
    
    
    if __name__ == '__main__':
        # Runs server and worker in separate processes
        p1 = multiprocessing.Process(target=server)
        p1.start()
    
        time.sleep(1)  # Wait for server to start
    
        p2 = multiprocessing.Process(target=worker)
        p2.start()
    
        p1.join()
        p2.join()