pythonmultithreadingcollectionsdequepyqtgraph

Serial Real Time Plotter python


The code below plots numbers received from serial port. However, the values of locals()[self.axes_mapping[axis]] are not all the received data from the function receive_data. It seems some values are lost. If I try to print the values of x, they correspond with the received data. How do I make sure that all the received data are plotted?

import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial

BUFFER_SIZE = 100

class RealTimePlot(QMainWindow):
    def __init__(self):
        super().__init__()

        self.setup_ui()
    
        self.serial_port = serial.Serial('COM6', 115200)
        self.N = 100
        self.fs = 1000  # Sampling frequency in Hz (adjust according to your setup)
        self.T = 1/self.fs
        self.x_values = np.arange(0, self.N*self.T, self.T)

        # Circular buffers for time domain plots
        self.axes = ['X', 'RMS']

        self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
        self.z_index = {axis: 0 for axis in self.axes}

        # Axes variable mapping
        self.axes_mapping = {'X': 'x', 'RMS': 'rms'}

        # Plotting setup
        self.setup_plots()

        self.data_queue = queue.Queue()
        # Lock for synchronizing access to the data queue
        self.data_queue_lock = threading.Lock()

        # Create and start the receiving thread
        self.receive_thread = threading.Thread(target=self.receive_data)
        self.receive_thread.daemon = True
        self.receive_thread.start()

        # Start the animation
        self.timer = QtCore.QTimer(self)
        self.timer.timeout.connect(self.update_plot)
        self.timer.start(10)

    def setup_ui(self):
        self.central_widget = pg.GraphicsLayoutWidget()
        self.setCentralWidget(self.central_widget)

    def setup_plots(self):
        self.plots = {axis: self.central_widget.addPlot(row=i, col=0, title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
                      for i, axis in enumerate(self.axes)}

        for plot in self.plots.values():
            plot.setLabel('bottom', 'Time', 's')
            # plot.setLabel('left', 'Amplitude', 'g')
            plot.setYRange(-2, 5000)


        linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
        # linha4 = pg.mkPen((255, 255, 255), width=2) 
        self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
        # self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
        self.plots['RMS'].setYRange(0, 5000)

    def receive_data(self):
        data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
        data_cnt = 0
        rx_flag = True
        while True:
            if self.serial_port.in_waiting > 0.0:  # Check if there is data waiting
                    data_str =  int.from_bytes(self.serial_port.read(2), byteorder='little', signed = False)
                    rx_flag = True

            with self.data_queue_lock:
                if rx_flag == True:
                    data_buffer[data_cnt] = data_str
                    data_cnt = data_cnt+1
                    rx_flag = False

                # Check if the buffer size is reached, then update the plot
                if data_cnt >= BUFFER_SIZE:
                    self.data_queue.put(data_buffer)
                    data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
                    data_cnt = 0
                    

    def calculate_rms(self, x):
        return np.sqrt(np.mean(np.square([x])))

    def update_plot(self):
        with self.data_queue_lock:
            while not self.data_queue.empty():
                data_buffer = self.data_queue.get()

                for data_str in data_buffer:
                    x = data_str
                    rms = self.calculate_rms(data_buffer)

                for axis in self.axes:
                    self.z_values[axis].append(locals()[self.axes_mapping[axis]])
                    self.lines[axis].setData(self.x_values, self.z_values[axis])
                    print(locals()[self.axes_mapping[axis]])
        return self.lines.values()
    

def closeEvent(self, event):
    self.csv_file.close()
    event.accept()

def main():
    app = QApplication(sys.argv)
    window = RealTimePlot()
    window.show()
    sys.exit(app.exec())

if __name__ == '__main__':
    main()

The example is from here

https://www.reddit.com/r/embedded/comments/18h9smt/attempted_switch_from_dpc_blue_screen_error_while/


Solution

  • You have a bug. This code

    for data_str in data_buffer:
        x = data_str  # x = one item, drops the rest
        rms = self.calculate_rms(data_buffer)
    
    for axis in self.axes:
        self.z_values[axis].append(locals()[self.axes_mapping[axis]])
        self.lines[axis].setData(self.x_values, self.z_values[axis])
    

    should be

    x = data_buffer  # x = all items
    rms = [self.calculate_rms(data_buffer)]
    
    for axis in self.axes:  # note the use of extend for list/array
        self.z_values[axis].extend(locals()[self.axes_mapping[axis]])
        self.lines[axis].setData(self.x_values, self.z_values[axis])
    

    Also both axes should have different time scales, since the second one is the average of 100 points in the first one.


    Lastly the use of locals is very discouraged, it is too brittle and prevents future refactoring or code modification. I'd probably just use a dictionary instead.

    new_plot_data = {}  # replaces locals()
    new_plot_data["x"] = data_buffer
    new_plot_data["rms"] = [self.calculate_rms(data_buffer)]
    

    full example (without pyserial use)

    import random
    import time
    import sys
    from PyQt6.QtWidgets import QApplication, QMainWindow
    import pyqtgraph as pg
    from pyqtgraph.Qt import QtCore
    import numpy as np
    from collections import deque
    import threading
    import queue
    import serial
    
    BUFFER_SIZE = 100
    
    
    class RealTimePlot(QMainWindow):
        def __init__(self):
            super().__init__()
    
            self.setup_ui()
    
            self.serial_port = None
            self.N = 100
            self.fs = 1000  # Sampling frequency in Hz (adjust according to your setup)
            self.T = 1 / self.fs
            self.x_values = np.arange(0, self.N * self.T, self.T)
    
            # Circular buffers for time domain plots
            self.axes = ['X', 'RMS']
    
            self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
            self.z_index = {axis: 0 for axis in self.axes}
    
            # Axes variable mapping
            self.axes_mapping = {'X': 'x', 'RMS': 'rms'}
    
            # Plotting setup
            self.setup_plots()
    
            self.data_queue = queue.Queue()
            # Lock for synchronizing access to the data queue
            self.data_queue_lock = threading.Lock()
    
            # Create and start the receiving thread
            self.receive_thread = threading.Thread(target=self.receive_data)
            self.receive_thread.daemon = True
            self.receive_thread.start()
    
            # Start the animation
            self.timer = QtCore.QTimer(self)
            self.timer.timeout.connect(self.update_plot)
            self.timer.start(10)
    
        def setup_ui(self):
            self.central_widget = pg.GraphicsLayoutWidget()
            self.setCentralWidget(self.central_widget)
    
        def setup_plots(self):
            self.plots = {axis: self.central_widget.addPlot(row=i, col=0,
                                                            title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
                          for i, axis in enumerate(self.axes)}
    
            for plot in self.plots.values():
                plot.setLabel('bottom', 'Time', 's')
                # plot.setLabel('left', 'Amplitude', 'g')
                plot.setYRange(-2, 5000)
    
            linha1 = pg.mkPen((52, 255, 52), width=2)  # R G B & width
            # linha4 = pg.mkPen((255, 255, 255), width=2)
            self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
            # self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
            self.plots['RMS'].setYRange(0, 5000)
    
        def receive_data(self):
            data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
            data_cnt = 0
            rx_flag = True
            while True:
                time.sleep(0.01)
                data_str = random.random() * 256
                rx_flag = True
    
                with self.data_queue_lock:
                    if rx_flag == True:
                        data_buffer[data_cnt] = data_str
                        data_cnt = data_cnt + 1
                        rx_flag = False
    
                    # Check if the buffer size is reached, then update the plot
                    if data_cnt >= BUFFER_SIZE:
                        self.data_queue.put(data_buffer)
                        data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
                        data_cnt = 0
    
        def calculate_rms(self, x):
            return np.sqrt(np.mean(np.square([x])))
    
        def update_plot(self):
            with self.data_queue_lock:
                while not self.data_queue.empty():
                    data_buffer = self.data_queue.get()
    
                    new_plot_data = {}
                    new_plot_data["x"] = data_buffer
                    new_plot_data["rms"] = [self.calculate_rms(data_buffer)]
    
                    for axis in self.axes:
                        self.z_values[axis].extend(new_plot_data[self.axes_mapping[axis]])
                        self.lines[axis].setData(self.x_values, self.z_values[axis])
            return self.lines.values()
    
    
    def closeEvent(self, event):
        self.csv_file.close()
        event.accept()
    
    
    def main():
        app = QApplication(sys.argv)
        window = RealTimePlot()
        window.show()
        sys.exit(app.exec())
    
    
    if __name__ == '__main__':
        main()