I’m working on a project that involves a Raspberry Pi 5 running a Python-based GUI and an RP2040 (running MicroPython) connected to a set of I2C sensors and 32 servos controlled by two PCA9685 PWM controllers.
My goal is to collect data from the sensors at a frequency of at least 1Hz, but preferably 10Hz+. The data consists of approximately 256 float variables from these sensors, and I need to verify the integrity of the data with a checksum.
Additionally, I need to update the 32 servos at a frequency between 1-10Hz, passing 4-digit integers to them. (0-4096)
As a proof of concept, I tried using serial communication between a Raspberry Pi 5 and Raspberry Pi Pico, with the Pico connected to one PCA9685 I2C breakout board. However, I’ve been encountering timing issues, missing/corrupted data, and problems getting the checksum to work properly.
I’m wondering if I’m being too ambitious with this solution, or if there are better alternatives out there. Any advice would be greatly appreciated. Thanks in advance to anyone who reads this.
Here’s the code I’ve been using:
Raspberry Pi 5 Code:
import serial
import json
import hashlib
from random import randint
def compute_checksum(data):
return hashlib.md5(data.encode()).hexdigest()
def main():
s = serial.Serial(port="/dev/ttyACM0", baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=2) # reduced timeout
s.flush()
while True:
try:
data = [randint(0, 4096) for _ in range(16)] # replace with your actual data
json_data = json.dumps(data)
checksum = compute_checksum(json_data)
s.write((json_data + '|' + checksum + '\r').encode())
s.flush() # flush the buffer
if s.in_waiting > 0:
response = s.read_until().strip()
print('Received:', response.decode())
except Exception as e:
print(f"An error occurred: {e}")
break
if __name__ == "__main__":
main()
Raspberry Pi Pico Code:
import board
import busio
import ujson
import select
import sys
from adafruit_pca9685 import PCA9685
# Create the I2C bus interface.
i2c = busio.I2C(board.GP17, board.GP16) # Pi Pico RP2040
# Create a simple PCA9685 class instance.
pca = PCA9685(i2c)
# Set the PWM frequency to 60hz.
pca.frequency = 1000
def compute_checksum(data):
h = uhashlib.md5()
h.update(data)
return ubinascii.hexlify(h.digest()).decode()
# Set up the poll object
poll_obj = select.poll()
poll_obj.register(sys.stdin, select.POLLIN)
# Loop indefinitely
while True:
try:
# Wait for input on stdin
poll_results = poll_obj.poll(0) # the '0' is how long it will wait for message before looping again (in microseconds)
if poll_results:
# Read the data from stdin (read data coming from PC)
received_data = sys.stdin.readline().strip()
data, received_checksum = received_data.split('|')
try:
values = ujson.loads(data)
computed_checksum = compute_checksum(data)
if computed_checksum == received_checksum:
for i in range(16):
pca.channels[i].duty_cycle = values[i]
sys.stdout.write("received data: " + data + "\r")
else:
sys.stdout.write("checksum error\r")
except ValueError:
sys.stdout.write("invalid json: " + data + "\r")
except Exception as e:
print(f"An error occurred: {e}")
break
I have two suggestions:
Let's look at each idea in a separate section. You can use either or both ideas - the first being the simpler.
Your baud rate of 9,600 bits/s is pretty slow and most modern devices can manage much more than that. When you convert the bits to bytes, that is only 1200 bytes/s. Given that you will lose a couple of bits for parity and stop/start for every byte, you will only get around 1000 bytes/s.
I would suggest you try:
s = serial.Serial(port="/dev/ttyACM0", baudrate=115200 ...
Failing that, step down through 96000, 57600 etc.
Secondly, your packing is inefficient. When you convert your list to JSON and add a hex digest, it takes around 130 bytes. You can only transmit that 7 times a second at 1000 bytes/s. I would suggest a much more compact frame as follows:
ff ff <16 off 2-byte shorts> CRC
where ff ff
is a header, followed by your 16 values as 2-bytes unsigned shorts and an 8-bit CRC. That makes a 35 bytes frame, or around 75% less than you currently have.
Here is some code to pack a list of values into a frame and unpack a frame into a list of values with checking:
#!/usr/bin/env python3
import sys
import crc8
import struct
import random
import binascii
def Pack(values):
"""Packs the list of values into a packet with checksum"""
# The 32 byte payload of 16 values
payload = struct.pack('!16H',*values)
# Calculate CRC over payload
CRC = crc8.crc8()
CRC.update(payload)
# Assemble header + payload + CRC into a packet
packet = b'\xff\xff' + payload + CRC.digest()
return packet
def Unpack(packet):
"""Unpacks a packet and returns a list of values or empty list if CRC incorrect or other error"""
# Extract components of packet
hdr = packet[:2]
payload = packet[2:34]
packetCRC = packet[-1]
# Check length of packet is correct
if len(packet) != 35:
print(f'ERROR: Packet length incorrect, received {len(packet)} bytes, expected 35', file=sys.stderr)
return []
# Check header is as expected
if hdr != b'\xff\xff':
print(f'ERROR: Packet header incorrect, received {hdr}, expected <ff> <ff>', file=sys.stderr)
return []
# Check CRC
CRC = crc8.crc8()
CRC.update(payload)
if packetCRC != int.from_bytes(CRC.digest()):
print(f'ERROR: Packet CRC incorrect', file=sys.stderr)
return []
# Everything ok, unpack and return list of 16 values
values = struct.unpack('!16H',payload)
return values
if __name__ == "__main__":
# Generate list of random values
values = [ random.randint(0,65535) for i in range(16)]
print('Initial list: ', *values)
# Assemble values into a packet
packet = Pack(values)
print("<<<UNPACK>>>\n\n")
# Unpack values from packet
result = Unpack(packet)
print('Unpacked list: ', *values)
Your code will need to change so that you read/write exactly 35 bytes rather than looking for newlines and JSON strings.
I put the header on there so you can re-sync if there are any errors. Basically, if you call Unpack()
and get an empty list back, you have lost sync. In that case you need to read until you get ff
twice in a row and then read the following 33 bytes (payload plus CRC) and pass it (plus a header of ff ff
) to Unpack()
to get re-synced.
Note that your data values can be represented in 12 bits rather than 16. So, rather than packing each sample into 16 bits for 2 bytes/value, you could pack 2 samples into 24 bits for 1.5 bytes/value. It doesn't currently seem worth the effort to do that bit-shifting though.