The code below plots numbers received from serial port. However, the values of locals()[self.axes_mapping[axis]]
are not all the received data from the function receive_data
. It seems some values are lost. If I try to print the values of x, they correspond with the received data. How do I make sure that all the received data are plotted?
import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial
BUFFER_SIZE = 100
class RealTimePlot(QMainWindow):
def __init__(self):
super().__init__()
self.setup_ui()
self.serial_port = serial.Serial('COM6', 115200)
self.N = 100
self.fs = 1000 # Sampling frequency in Hz (adjust according to your setup)
self.T = 1/self.fs
self.x_values = np.arange(0, self.N*self.T, self.T)
# Circular buffers for time domain plots
self.axes = ['X', 'RMS']
self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
self.z_index = {axis: 0 for axis in self.axes}
# Axes variable mapping
self.axes_mapping = {'X': 'x', 'RMS': 'rms'}
# Plotting setup
self.setup_plots()
self.data_queue = queue.Queue()
# Lock for synchronizing access to the data queue
self.data_queue_lock = threading.Lock()
# Create and start the receiving thread
self.receive_thread = threading.Thread(target=self.receive_data)
self.receive_thread.daemon = True
self.receive_thread.start()
# Start the animation
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.update_plot)
self.timer.start(10)
def setup_ui(self):
self.central_widget = pg.GraphicsLayoutWidget()
self.setCentralWidget(self.central_widget)
def setup_plots(self):
self.plots = {axis: self.central_widget.addPlot(row=i, col=0, title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
for i, axis in enumerate(self.axes)}
for plot in self.plots.values():
plot.setLabel('bottom', 'Time', 's')
# plot.setLabel('left', 'Amplitude', 'g')
plot.setYRange(-2, 5000)
linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
# linha4 = pg.mkPen((255, 255, 255), width=2)
self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
# self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
self.plots['RMS'].setYRange(0, 5000)
def receive_data(self):
data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
data_cnt = 0
rx_flag = True
while True:
if self.serial_port.in_waiting > 0.0: # Check if there is data waiting
data_str = int.from_bytes(self.serial_port.read(2), byteorder='little', signed = False)
rx_flag = True
with self.data_queue_lock:
if rx_flag == True:
data_buffer[data_cnt] = data_str
data_cnt = data_cnt+1
rx_flag = False
# Check if the buffer size is reached, then update the plot
if data_cnt >= BUFFER_SIZE:
self.data_queue.put(data_buffer)
data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
data_cnt = 0
def calculate_rms(self, x):
return np.sqrt(np.mean(np.square([x])))
def update_plot(self):
with self.data_queue_lock:
while not self.data_queue.empty():
data_buffer = self.data_queue.get()
for data_str in data_buffer:
x = data_str
rms = self.calculate_rms(data_buffer)
for axis in self.axes:
self.z_values[axis].append(locals()[self.axes_mapping[axis]])
self.lines[axis].setData(self.x_values, self.z_values[axis])
print(locals()[self.axes_mapping[axis]])
return self.lines.values()
def closeEvent(self, event):
self.csv_file.close()
event.accept()
def main():
app = QApplication(sys.argv)
window = RealTimePlot()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
main()
The example is from here
You have a bug. This code
for data_str in data_buffer:
x = data_str # x = one item, drops the rest
rms = self.calculate_rms(data_buffer)
for axis in self.axes:
self.z_values[axis].append(locals()[self.axes_mapping[axis]])
self.lines[axis].setData(self.x_values, self.z_values[axis])
should be
x = data_buffer # x = all items
rms = [self.calculate_rms(data_buffer)]
for axis in self.axes: # note the use of extend for list/array
self.z_values[axis].extend(locals()[self.axes_mapping[axis]])
self.lines[axis].setData(self.x_values, self.z_values[axis])
Also both axes should have different time scales, since the second one is the average of 100 points in the first one.
Lastly the use of locals
is very discouraged, it is too brittle and prevents future refactoring or code modification. I'd probably just use a dictionary instead.
new_plot_data = {} # replaces locals()
new_plot_data["x"] = data_buffer
new_plot_data["rms"] = [self.calculate_rms(data_buffer)]
full example (without pyserial use)
import random
import time
import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial
BUFFER_SIZE = 100
class RealTimePlot(QMainWindow):
def __init__(self):
super().__init__()
self.setup_ui()
self.serial_port = None
self.N = 100
self.fs = 1000 # Sampling frequency in Hz (adjust according to your setup)
self.T = 1 / self.fs
self.x_values = np.arange(0, self.N * self.T, self.T)
# Circular buffers for time domain plots
self.axes = ['X', 'RMS']
self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
self.z_index = {axis: 0 for axis in self.axes}
# Axes variable mapping
self.axes_mapping = {'X': 'x', 'RMS': 'rms'}
# Plotting setup
self.setup_plots()
self.data_queue = queue.Queue()
# Lock for synchronizing access to the data queue
self.data_queue_lock = threading.Lock()
# Create and start the receiving thread
self.receive_thread = threading.Thread(target=self.receive_data)
self.receive_thread.daemon = True
self.receive_thread.start()
# Start the animation
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.update_plot)
self.timer.start(10)
def setup_ui(self):
self.central_widget = pg.GraphicsLayoutWidget()
self.setCentralWidget(self.central_widget)
def setup_plots(self):
self.plots = {axis: self.central_widget.addPlot(row=i, col=0,
title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
for i, axis in enumerate(self.axes)}
for plot in self.plots.values():
plot.setLabel('bottom', 'Time', 's')
# plot.setLabel('left', 'Amplitude', 'g')
plot.setYRange(-2, 5000)
linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
# linha4 = pg.mkPen((255, 255, 255), width=2)
self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
# self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
self.plots['RMS'].setYRange(0, 5000)
def receive_data(self):
data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
data_cnt = 0
rx_flag = True
while True:
time.sleep(0.01)
data_str = random.random() * 256
rx_flag = True
with self.data_queue_lock:
if rx_flag == True:
data_buffer[data_cnt] = data_str
data_cnt = data_cnt + 1
rx_flag = False
# Check if the buffer size is reached, then update the plot
if data_cnt >= BUFFER_SIZE:
self.data_queue.put(data_buffer)
data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
data_cnt = 0
def calculate_rms(self, x):
return np.sqrt(np.mean(np.square([x])))
def update_plot(self):
with self.data_queue_lock:
while not self.data_queue.empty():
data_buffer = self.data_queue.get()
new_plot_data = {}
new_plot_data["x"] = data_buffer
new_plot_data["rms"] = [self.calculate_rms(data_buffer)]
for axis in self.axes:
self.z_values[axis].extend(new_plot_data[self.axes_mapping[axis]])
self.lines[axis].setData(self.x_values, self.z_values[axis])
return self.lines.values()
def closeEvent(self, event):
self.csv_file.close()
event.accept()
def main():
app = QApplication(sys.argv)
window = RealTimePlot()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
main()