I am working on a project building an API that is able to send the live location of vehicles to a frontend.
I get this location data by subscribing to a ZMQ stream by running a while loop. This is all working and if I just run my stream as a script I can print all kinds of information to the terminal (I'll store those in a database later on).
I also have the FastAPI server up and running
Now what I'd like to do is:
What happens instead:
Here is my code:
# General FastAPI Imports
from fastapi import Depends, FastAPI, Request
from data_collection.livestream import enable_data_stream
from client_service import client_api
app = FastAPI()
app.include_router(client_api.router, prefix="/API/V1")
@app.get('/')
def read_root(request: Request):
return {"Hello": "World"}
The Stream:
from gzip import GzipFile
from io import BytesIO
import zmq
import xml.etree.ElementTree as ET
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://SERVER")
subscriber.setsockopt(zmq.SUBSCRIBE)
while True:
multipart = subscriber.recv_multipart()
address = multipart[0]
try:
contents = GzipFile('', 'r', 0, BytesIO(multipart[1])).read()
root = ET.fromstring(contents)
print("Updates Received:")
# Gets the timestamp
print('time', root[3].text)
print('X Coord: ', root[4][0][12].text)
print('Y Coord: ', root[4][0][13].text)
I tried looking into the multiprocess and threading implementations for python but I'm unsure how those tie in with starting the FastAPI process (as that's enabled from Uvicorn)
In the example below, the server and worker are started in separate processes because the While loop won't resolve. It seems that you were on the right track. In my example, I have these functions in one file, but there are no restrictions on someone breaking them out into their own files:
import uvicorn
import multiprocessing
import time
import zmq
import xml.etree.ElementTree as ET
from gzip import GzipFile
from io import BytesIO
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
def server():
uvicorn.run(app, host="localhost", port=8000)
def worker():
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://SERVER")
subscriber.setsockopt(zmq.SUBSCRIBE)
while True:
multipart = subscriber.recv_multipart()
address = multipart[0]
try:
contents = GzipFile('', 'r', 0, BytesIO(multipart[1])).read()
root = ET.fromstring(contents)
print("Updates Received:")
# Gets the timestamp
print('time', root[3].text)
print('X Coord: ', root[4][0][12].text)
print('Y Coord: ', root[4][0][13].text)
except Exception as e:
print(e)
print("Error: %s" % multipart[1])
break
if __name__ == '__main__':
# Runs server and worker in separate processes
p1 = multiprocessing.Process(target=server)
p1.start()
time.sleep(1) # Wait for server to start
p2 = multiprocessing.Process(target=worker)
p2.start()
p1.join()
p2.join()