pythonraspberry-piserial-port

Serial reading not give expected output


I made a code reading RPi by serial from pressure sensor. Somehow my code reading raw data from serial sensor but my while loop does not work and I'm betting blank screen. What I'm doing wrong? Whole code below:

import serial
import sys
import time

gauge = serial.Serial(
    port = '/dev/serial0',
    baudrate = 9600,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    bytesize=serial.EIGHTBITS,
    timeout = 1
    )

while(True):
    while(True):
        input_string = list(gauge.read(2)) #watch for two magic bytes
        if(input_string[0] == 7 and input_string[1] == 5):
            input_string += list(gauge.read(9-2)) #append the rest of the data
            if((sum(input_string[1:8]) % 256) == input_string[8]): #compute checksum
                break

    gauge_value = 10.0**((input_string[4] * 256.0 + input_string[5]) / 4000.0 - 12.5)
    if(len(sys.argv) > 2):
        print("mBar: {} Pa: {} Status: {} ({}) Error: {} ({})".format(gauge_value,gauge_value*100000.0,
                                                        input_string[2],bin(input_string[2]),
                                                        input_string[3],bin(input_string[3])))
    else:
        print("{},{}".format(time.time(), gauge_value))

Here is code reading raw data:

import serial
import time

ser = serial.Serial(
        port='/dev/serial0',#Replace ttyUSB0 with ttyAMA0 for Pi1,Pi2,Pi0
        baudrate = 9600,
        parity=serial.PARITY_NONE,
        stopbits=serial.STOPBITS_ONE,
        bytesize=serial.EIGHTBITS,
        timeout=1
)

while 1:
        x=ser.readline()
        print (x)

Raw data as below:

'\n'
b'\x8e\x07\x05@\x08\xbcI(!\xfe\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n'
b'\x1c\x07\x05\x00\x00\xec\x95\x94\xb5\xf8\x144\x15\xec\x0e\x00\n'
b'\t\x07\x05 \x88n8H$\xa8\x1c\x00\xec\x03\x00\xca{\x07\x15\x90\xb0\xb5?\n'
b'\xf3P\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n'

So my guess is I made something wrong in while loop. For clarification below message format from device manual: Manual


Solution

  • You appear to always be looking for a payload of 9 bytes so it might useful to read the serial connection one byte at a time and store it in a Python deque.

    Then after each read check the 9 bytes stored in the deque to see if there is the correct/expected sequence of bytes.

    The data you posted in your question did not seem consistent so I extended it using the following example from the manual: Example data from sensor manual the code appears to give the correct answer.

    I don't have the sensor so I substituted the serial port with a Python Binary I/O stream with the example data. This is the code I used to mock the data coming from the sensor and use structure for the processing of the incoming data:

    from collections import deque
    from enum import Enum
    import io
    
    
    PKT_SIZE = 9
    
    # gauge = serial.Serial(
    #     port = '/dev/serial0',
    #     baudrate = 9600,
    #     parity=serial.PARITY_NONE,
    #     stopbits=serial.STOPBITS_ONE,
    #     bytesize=serial.EIGHTBITS,
    #     timeout = 1000
    #     )
    
    gauge = io.BytesIO((
        b"\n\x8e"
        b"\x07\x05@\x08\xbcI(!\xfe"
        b"\x15A\x00\xec\x1c\x00\x8d\xbc\x87\x1c\x1c\xf4!\x00\n\x1c"
        b"\x07\x05\x00\x00\xec\x95\x94\xb5\xf8"
        b"\x144\x15\xec\x0e\x00\n\t"
        b"\x07\x05 \x88n8H$\xa8"
        b"\x1c\x00\xec\x03\x00\xca{"
        b"\x07\x15\x90\xb0\xb5?\n\xf3P"
        b"\x10\x00\xeb\xb6\x08\x95\xbc(\x1c\x1c\xb4\xfa\x00\n"
        b"\x07\x05\x00\x00\xf2\x30\x14\x0a\x45"
    ))
    
    
    class Units(Enum):
        MBAR = 0
        TORR = 1
        PA = 2
        UNUSED = 3
    
    
    class ErrorCodes(Enum):
        NO_ERROR = 0
        PIRANI_ADJ_ERROR = 5
        BA_ERROR = 8
        PIRANI_ERROR = 9
    
    
    def checksum_ok(sensor_data):
        calc_sum = sum(list(sensor_data)[1:8]) & 0xff
        return calc_sum == sensor_data[8]
    
    
    def process_read(sensor_data):
        # print(sensor_data)
        (
            data_len,
            page_num,
            status,
            error,
            hi_byte,
            lo_byte,
            version,
            sen_type,
            check_sum,
        ) = sensor_data
        if all(
            (
                data_len == 7,
                page_num == 5,
                sen_type == 10
            )
        ):
            if not checksum_ok(sensor_data):
                print("ERROR: Failed checksum")
                return None
            # print(f"{status:08b}")
            measurement = hi_byte << 8 | lo_byte
            units_status = get_units(status)
            error_status = get_error_status(error)
            if error_status != ErrorCodes.NO_ERROR:
                print(f"ERROR: {error_status}")
            gauge_value = calc_gauge_value(measurement, units_status)
            return gauge_value
        return None
    
    
    def get_units(status_byte):
        return Units(status_byte >> 4 & 0x03)
    
    
    def get_error_status(error_byte):
        return ErrorCodes(error_byte >> 4 & 0x0f)
    
    
    def calc_gauge_value(measurement, units_status):
        if units_status == Units.MBAR:
            gauge_value = 10.0 ** (measurement / 4000.0 - 12.5)
        elif units_status == Units.TORR:
            gauge_value = 10.0 ** (measurement / 4000.0 - 12.625)
        elif units_status == Units.PA:
            gauge_value = 10.0 ** (measurement / 4000.0 - 10.5)
        else:
            print("ERROR: invalid units setting")
            gauge_value = None
        return gauge_value
    
    
    def main():
        msg = deque([0] * PKT_SIZE, PKT_SIZE)
        while True:
            new_byte = gauge.read(1)
            msg.append(int.from_bytes(new_byte, "little", signed=False))
            # print(f'\t{msg}')
            reading = process_read(msg)
            if reading:
                print(f"\tSensor reading: {reading}")
            if sum(msg) == 0:
                break
    
    
    if __name__ == "__main__":
        main()