pythonmultithreadingflask

Python Queue not updated outside of Thread


I've created a Flask app that retrieves data from a queue that is updated in a separate thread. I'm not sure why the Queue is empty when I retrieve it from the Flask GET endpoint, and am a bit ignorant of what Queues being thread-safe is supposed to mean, since my example doesn't appear to reflect that.

In the example below, the queue in the Flask @app.route('/measurements') measurements route is empty even though it's updated in the TCP message handler. If anyone can enlighten me I'd appreciate it.

I am running this on Ubuntu with python3 in case that's relevant.

from flask import Flask, render_template
import socket
from threading import Thread
import os
from wyze_sdk import Client
from dotenv import load_dotenv
from wyze_sdk.errors import WyzeApiError
import time
from queue import Queue

load_dotenv()

app = Flask(__name__)

start_time = time.time()

response = Client().login(
    email=os.environ['WYZE_EMAIL'],
    password=os.environ['WYZE_PASSWORD'],
    key_id=os.environ['WYZE_KEY_ID'],
    api_key=os.environ['WYZE_API_KEY']
)

client = Client(token=response['access_token'])

HOST = '192.168.1.207'  # Listen on all network interfaces
PORT = 9000  # The same port as in your ESP8266 sketch

PORT_UI = 7001

MIN_WATER_DIST = 20 # minimum distance from sensor to water in cm
MAX_WATER_DIST = 45 # maximum distance from sensor to water in cm
MAX_TIME_PLUG_ON = 600 # maximum amount of time plug should be on

# Initialize state variables
plug_on_time = None  # Track when the plug was turned on

measurements = Queue(maxsize=86400)

@app.route('/measurements')
def measurements_api():
    current_time = time.time()
    recent_measurements = [m for m in list(measurements.queue) if current_time - m['timestamp'] <= 86400] 
    return {'measurements': recent_measurements} # empty queue, no measurements returned

# This function will handle incoming TCP messages
def handle_tcp_connection(client_socket, client_address, measurements):
    try:
        data = client_socket.recv(1024)  # Buffer size of 1024 bytes
        if data:
            distance_str = data.decode('utf-8')
            dist = int(distance_str)
            print(f"Received message: {distance_str}")

        timestamp = time.time()
        print(len(measurements.queue)) # prints the correct number of measurements
        
        measurements.get()
        measurements.put({'value': value, 'timestamp': timestamp})

        client_socket.close()
    except Exception as e:
        print(f"Error: {e}")
        client_socket.close()

# This function runs the TCP server in a separate thread
def run_tcp_server(measurements):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
        server_socket.bind((HOST, PORT))
        server_socket.listen(5)
        print(f"Listening on {HOST}:{PORT} for TCP connections...")
        while True:
            client_socket, client_address = server_socket.accept()
            # Handle each incoming client connection in a new thread
            Thread(target=handle_tcp_connection, args=(client_socket, client_address, measurements)).start()

# Start the TCP server in a separate thread
tcp_server_thread = Thread(target=run_tcp_server, daemon=True, args=(measurements,))
tcp_server_thread.start()

@app.route('/')
def index():
    return render_template('index.html')

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=PORT_UI, debug=True)

Solution

  • Queue is thread-safe but not process-shared.

    Could you please show your uvicorn command for running server?

    Also, I see you using debug=True. This command involves reloading, which can create two processes.

    I could suggest you:

    1. Use debug=False:
    app.run(host='0.0.0.0', port=PORT_UI, debug=False)
    
    1. Confirming it’s a single process in router handlers
    print(os.getpid())