I made a code reading RPi by serial from pressure sensor. Somehow my code reading raw data from serial sensor but my while
loop does not work and I'm betting blank screen.
What I'm doing wrong? Whole code below:
import serial
import sys
import time
gauge = serial.Serial(
port = '/dev/serial0',
baudrate = 9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = 1
)
while(True):
while(True):
input_string = list(gauge.read(2)) #watch for two magic bytes
if(input_string[0] == 7 and input_string[1] == 5):
input_string += list(gauge.read(9-2)) #append the rest of the data
if((sum(input_string[1:8]) % 256) == input_string[8]): #compute checksum
break
gauge_value = 10.0**((input_string[4] * 256.0 + input_string[5]) / 4000.0 - 12.5)
if(len(sys.argv) > 2):
print("mBar: {} Pa: {} Status: {} ({}) Error: {} ({})".format(gauge_value,gauge_value*100000.0,
input_string[2],bin(input_string[2]),
input_string[3],bin(input_string[3])))
else:
print("{},{}".format(time.time(), gauge_value))
Here is code reading raw data:
import serial
import time
ser = serial.Serial(
port='/dev/serial0',#Replace ttyUSB0 with ttyAMA0 for Pi1,Pi2,Pi0
baudrate = 9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1
)
while 1:
x=ser.readline()
print (x)
Raw data as below:
'\n'
b'\x8e\x07\x05@\x08\xbcI(!\xfe\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n'
b'\x1c\x07\x05\x00\x00\xec\x95\x94\xb5\xf8\x144\x15\xec\x0e\x00\n'
b'\t\x07\x05 \x88n8H$\xa8\x1c\x00\xec\x03\x00\xca{\x07\x15\x90\xb0\xb5?\n'
b'\xf3P\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n'
So my guess is I made something wrong in while
loop.
For clarification below message format from device manual:
You appear to always be looking for a payload of 9 bytes so it might useful to read the serial connection one byte at a time and store it in a Python deque.
Then after each read check the 9 bytes stored in the deque to see if there is the correct/expected sequence of bytes.
The data you posted in your question did not seem consistent so I extended it using the following example from the manual: the code appears to give the correct answer.
I don't have the sensor so I substituted the serial port with a Python Binary I/O stream with the example data. This is the code I used to mock the data coming from the sensor and use structure for the processing of the incoming data:
from collections import deque
from enum import Enum
import io
PKT_SIZE = 9
# gauge = serial.Serial(
# port = '/dev/serial0',
# baudrate = 9600,
# parity=serial.PARITY_NONE,
# stopbits=serial.STOPBITS_ONE,
# bytesize=serial.EIGHTBITS,
# timeout = 1000
# )
gauge = io.BytesIO((
b"\n\x8e"
b"\x07\x05@\x08\xbcI(!\xfe"
b"\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n\x1c"
b"\x07\x05\x00\x00\xec\x95\x94\xb5\xf8"
b"\x144\x15\xec\x0e\x00\n\t"
b"\x07\x05 \x88n8H$\xa8"
b"\x1c\x00\xec\x03\x00\xca{"
b"\x07\x15\x90\xb0\xb5?\n\xf3P"
b"\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n"
b"\x07\x05\x00\x00\xf2\x30\x14\x0a\x45"
))
class Units(Enum):
MBAR = 0
TORR = 1
PA = 2
UNUSED = 3
class ErrorCodes(Enum):
NO_ERROR = 0
PIRANI_ADJ_ERROR = 5
BA_ERROR = 8
PIRANI_ERROR = 9
def checksum_ok(sensor_data):
calc_sum = sum(list(sensor_data)[1:8]) & 0xff
return calc_sum == sensor_data[8]
def process_read(sensor_data):
# print(sensor_data)
(
data_len,
page_num,
status,
error,
hi_byte,
lo_byte,
version,
sen_type,
check_sum,
) = sensor_data
if all(
(
data_len == 7,
page_num == 5,
sen_type == 10
)
):
if not checksum_ok(sensor_data):
print("ERROR: Failed checksum")
return None
# print(f"{status:08b}")
measurement = hi_byte << 8 | lo_byte
units_status = get_units(status)
error_status = get_error_status(error)
if error_status != ErrorCodes.NO_ERROR:
print(f"ERROR: {error_status}")
gauge_value = calc_gauge_value(measurement, units_status)
return gauge_value
return None
def get_units(status_byte):
return Units(status_byte >> 4 & 0x03)
def get_error_status(error_byte):
return ErrorCodes(error_byte >> 4 & 0x0f)
def calc_gauge_value(measurement, units_status):
if units_status == Units.MBAR:
gauge_value = 10.0 ** (measurement / 4000.0 - 12.5)
elif units_status == Units.TORR:
gauge_value = 10.0 ** (measurement / 4000.0 - 12.625)
elif units_status == Units.PA:
gauge_value = 10.0 ** (measurement / 4000.0 - 10.5)
else:
print("ERROR: invalid units setting")
gauge_value = None
return gauge_value
def main():
msg = deque([0] * PKT_SIZE, PKT_SIZE)
while True:
new_byte = gauge.read(1)
msg.append(int.from_bytes(new_byte, "little", signed=False))
# print(f'\t{msg}')
reading = process_read(msg)
if reading:
print(f"\tSensor reading: {reading}")
if sum(msg) == 0:
break
if __name__ == "__main__":
main()