c++serial-portboost-asiocom-port

Reading data from the COM port - Buffer Overflow


I have a problem processing packets from the COM port. This port reads at a speed of 115200. A lot of data from this port is lost. The buffer overflows almost immediately. In the code I attach, I try to remove the overprocessed buffer on the fly.

void COM::ReadLoop(std::shared_ptr<std::vector<unsigned char>> pbuf) {
    if (!pbuf) {
        assert(m_readoperation.expired()); // do not post overlapping read operations
        pbuf = std::make_shared<std::vector<unsigned char>>();
        m_readoperation = pbuf;
    }

    asio::async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
        [this, pbuf](boost::system::error_code ec, size_t /*length*/) {
            if (!ec) {
                auto& buffer = *pbuf;
                size_t const length = buffer.size();
            
                if (length > 1024) {
                    g_logger.info("buffer overflow");
                    buffer.erase(buffer.begin(), buffer.begin() + length - 1024); 
                }
                m_Data.store(length);

                size_t i = 0;
                while (i < buffer.size() - 2) {
                    if (buffer[i] == 170 && buffer[i + 1] == 170) {
                        size_t len = buffer[i + 2];
                        if (len < 4 || len >= 170 || i + 3 + len > buffer.size()) {
                            i++; //We move the index to continue searching
                            continue;
                        }
                        std::vector<unsigned char> packetData(buffer.begin() + i + 3, buffer.begin() + i + 3 + len);
                        parse_packet(packetData);
                        buffer.erase(buffer.begin(), buffer.begin() + i + 3 + len);
                        i = 0;
                        previousSize = buffer.size();

                    }
                    else {
                        i++;
                    }
                    if (buffer.size() == previousSize) {
                             // If the buffer has not been modified, we continue processing
                             // This prevents infinite looping when there is no more data to process
                        break;
                    }
                }

                ReadLoop(pbuf); 
            }
            else {
                g_logger.info("ReadLoop error: " + ec.message());
            }
        });
}





void COM::parse_packet(const std::vector<unsigned char>& data) {

    if (data.size() < 4) // Minimal packet length check
        return;

    int generated_checksum = 0;
    for (size_t i = 0; i < data.size() - 1; ++i) { // Omit the checksum byte in calculations
        generated_checksum += data[i];
    }
    generated_checksum = 255 - (generated_checksum % 256);

    int checksum = data.back(); // Assume the last byte is the checksum

    if (checksum != generated_checksum) {
        return; // Checksum mismatch
    }


    int i = 0;
    while (i < data.size() - 1) {
        unsigned char data_type = data[i++];
        g_logger.info(std::to_string(data_type));

        switch (data_type) {
            g_logger.info(std::to_string(data_type));
        case 1: { 
            unsigned char battery_level = data[i++];
            m_BatteryLevel.store(battery_level);
            break;
        }
        // here case 2-5
   
        case 0x80: { // Raw data
            i += 2; // We ignore the byte length
            int raw_data = data[i++] << 8;
            raw_data += data[i++];
            if (raw_data >= 32768) {
                raw_data -= 65536;
            }
            m_Raw.store(raw_data);
        }
        case 0x83: { // Power
            std::vector<int> g_power;
            std::ostringstream oss;

            for (int j = 0; j < 8; ++j) {
                int power = data[i + 2] + (data[i + 1] << 8) + (data[i] << 16);
                if (power > 8388608) {
                    power -= 16777216;
                }
                g_power.push_back(power);
                i += 3;
            }
            for (auto power : g_power) {
                oss << power << " ";
            }
            m_PowerG = oss.str();

            break;
        }
                
        default: {
            break;
        }
        }
    }
}

by the way, I have a program written in python that can easily read and process packages.

Few seconds of run program

Initialize port: COM8
Port initialized successfully.
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
128
buffer overflow
128
128

Question How to read data from the buffer so that the buffer does not overflow?

little update

I found the original code. It is written in Delphi. There's some logic here

  if l>512 then l:=l1;
     if l>512 then begin
        ReceivedString := '';
        UnparsedRemainingString:='';
        CurrentPosition:=0;
        l:=0;
     end;
  UnparsedRemainingString:='';
  for i:=1 to l-2 do
     if ReceiveByte()=170 then if ReceiveByte()=170 then begin
        len:=ReceiveByte();
        if len<4 then continue;
        if len>=170 then continue;
        if CurrentPosition+len>Length(ReceivedString) then begin DropParsing; break; end else ParsePacket(len);
     end;
end;

function ReceiveByte() : byte;
begin
  Result:=0;
  if CurrentPosition>=Length(ReceivedString) then exit;
  Result:=byte(ReceivedString[CurrentPosition]);
  CurrentPosition:=CurrentPosition+1;
end;

Procedure SkipByte();
begin
 CurrentPosition:=CurrentPosition+1;
end;

Procedure DropParsing();
var i: integer;
begin
  UnparsedRemainingString:='';
  if CurrentPosition>3 then begin
     for i:=CurrentPosition-3 to Length(ReceivedString)
        do UnparsedRemainingString:=UnparsedRemainingString+ReceivedString[i];
  end else UnparsedRemainingString:='';
  if length(UnparsedRemainingString)>128 then UnparsedRemainingString:='';
  ReceivedString:='';
end;

and this is my actually code in C++

    asio::async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
        [this, pbuf](boost::system::error_code ec, size_t /*length*/) {
            if (!ec) {
                auto& buffer = *pbuf;
                size_t const length = buffer.size();
                // Przygotowanie do ponownego odczytu, jeśli długość bufora przekracza 512 bajtów
                m_Data.store(length);
                size_t i = 0;
                while (i < buffer.size() - 2) {
                    if (buffer[i] == 170 && buffer[i + 1] == 170) {
                        size_t len = buffer[i + 2];
                        if (len < 4 || len >= 170 || i + 3 + len > buffer.size()) {
                            i++; // Przesuwamy indeks, aby kontynuować przeszukiwanie
                            continue;
                        }
                        std::vector<unsigned char> packetData(buffer.begin() + i + 3, buffer.begin() + i + 3 + len);
                        parse_packet(packetData);
                        i += 3 + len;


                    }
                    else {
                        i++;
                    }

                }
                

                if (length > 32768) {
                    g_logger.info("buffer overflow");
                    buffer.erase(buffer.begin(), buffer.begin() + length - 32768); // Zachowujemy ostatnie 512 bajtów
                }
                

                ReadLoop(pbuf); // Kontynuacja pętli odczytu
            }
            else {
                g_logger.info("ReadLoop error: " + ec.message());
            }
        });
}



128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128

I still don't know how to remove buffer. And I'm not sure if the increase should be there

  1. and += len - 1;
  2. and += 3 + len;

Solution

  • I'm going to interpret the question the way it's likely intended: "how to read continuous stream of logical messages that may arrive in unpredictable transport packets".

    I'd probably extract all the logic into handy functions, so you can express your intent with code:

       m_Data.store(pbuf->size());
    
       // Directly process binary data from buffer here.
       View remain(*pbuf);
       while (auto msg = scan_for_message(remain))
           process(*msg);
    
       pbuf->erase(pbuf->begin(), pbuf->end() - remain.length());
       // Continue reading into the same buffer for new data.
       ReadLoop(pbuf);
    

    The helpers can be:

    using View = std::basic_string_view<uint8_t>;
    
    constexpr uint8_t calc_checksum(View data) {
        unsigned cksum = 0;
        for (uint8_t b : data)
            cksum += b;
        return 255 - (cksum % 256);
    }
    

    That gives us an easy way to validate message payload:

    constexpr bool validate_message_payload(View payload) // signature and length not included
    {
        // size
        if (payload.size() < 4)
            return false; // Minimal packet length
    
        // checksum
        {
            uint8_t const expected = payload.back();
            payload.remove_suffix(1); // Exclude checksum byte
            uint8_t actual = calc_checksum(payload);
    
            g_logger.info("Checksum expected: " + std::to_string(expected) +
                          ", actual: " + std::to_string(actual));
    
            if (expected != actual) {
                g_logger.info("Checksum failed");
                return false;
            }
        }
    
        return true;
    }
    

    And now we can implement scan_for_message:

    // Finds the first *valid* message, discarding preceding data.
    // Returns the payload data span or nullopt if none found.
    constexpr std::optional<View> scan_for_message(View& buffer) {
        while (!buffer.empty()) {
            // Find signature
            static constexpr std::array<uint8_t, 2> signature{0xaa, 0xaa};
            {
                View constexpr sigspan(signature.data(), signature.size());
                auto pos = buffer.find(sigspan);
    
                if (pos == View::npos) {
                    buffer.remove_prefix(buffer.length()); // none of the buffer is relevant
                    return {};
                }
    
                buffer.remove_prefix(pos); // skip noise/partial message debris
            }
            auto candidate = buffer.substr(signature.size());
    
            // Get length
            if (candidate.empty())
                return {}; // incomplete
    
            unsigned payload_len = candidate.front();
    
            if (candidate.length() < payload_len + 1)
                return {}; // incomplete
    
            View payload = candidate.substr(1, payload_len); // skips length byte
    
            if (validate_message_payload(payload)) {
                // remove from buffer, and return payload
                buffer.remove_prefix(signature.size() + 1 + payload_len);
                auto data = payload.substr(0, payload.length() - 1); // throw away checksum
                return data;
            } else {
    
                // Not valid.
    
                // Note: we don't skip the entire payload, just the current header signature, because a valid signature might appear in the middle
                // of the payload just tried to validate
                buffer.remove_prefix(signature.size());
            }
        }
    
        return {};
    }
    

    The whole point of this is to responsibly consume buffer contents, while preserving potential incomplete messages.

    Live Demo

    #include <boost/asio.hpp>
    #include <fmt/ranges.h>
    #include <fmt/printf.h>
    #include <iostream>
    using namespace std::chrono_literals;
    using namespace std::string_literals;
    namespace asio = boost::asio;
    using std::this_thread::sleep_for;
    
    // mocking question code
    struct {
        void info(std::string const& msg) const {
            static auto const start = std::chrono::steady_clock::now();
    
            auto ts = (std::chrono::steady_clock::now() - start);
            if (ts > 1s)
                std::cout << std::setw(6) << ts / 1.s << "s  " << msg << std::endl;
            else
                std::cout << std::setw(6) << ts / 1ms << "ms " << msg << std::endl;
        }
    } static g_logger;
    // end mocks
    
    using View = std::basic_string_view<uint8_t>;
    
    constexpr uint8_t calc_checksum(View data) {
        unsigned cksum = 0;
        for (uint8_t b : data)
            cksum += b;
        return 255 - (cksum % 256);
    }
    
    constexpr bool validate_message_payload(View payload) // signature and length not included
    {
        // size
        if (payload.size() < 4)
            return false; // Minimal packet length
    
        // checksum
        {
            uint8_t const expected = payload.back();
            payload.remove_suffix(1); // Exclude checksum byte
            uint8_t actual = calc_checksum(payload);
    
            g_logger.info("Checksum expected: " + std::to_string(expected) +
                          ", actual: " + std::to_string(actual));
    
            if (expected != actual) {
                g_logger.info("Checksum failed");
                return false;
            }
        }
    
        return true;
    }
    
    // Finds the first *valid* message, discarding preceding data.
    // Returns the payload data span or nullopt if none found.
    constexpr std::optional<View> scan_for_message(View& buffer) {
        while (!buffer.empty()) {
            // Find signature
            static constexpr std::array<uint8_t, 2> signature{0xaa, 0xaa};
            {
                View constexpr sigspan(signature.data(), signature.size());
                auto pos = buffer.find(sigspan);
    
                if (pos == View::npos) {
                    buffer.remove_prefix(buffer.length()); // none of the buffer is relevant
                    return {};
                }
    
                buffer.remove_prefix(pos); // skip noise/partial message debris
            }
            auto candidate = buffer.substr(signature.size());
    
            // Get length
            if (candidate.empty())
                return {}; // incomplete
    
            unsigned payload_len = candidate.front();
    
            if (candidate.length() < payload_len + 1)
                return {}; // incomplete
    
            View payload = candidate.substr(1, payload_len); // skips length byte
    
            if (validate_message_payload(payload)) {
                // remove from buffer, and return payload
                buffer.remove_prefix(signature.size() + 1 + payload_len);
                auto data = payload.substr(0, payload.length() - 1); // throw away checksum
                return data;
            } else {
    
                // Not valid.
    
                // Note: we don't skip the entire payload, just the current header signature, because it might be
                // valid part of a previous partial message, and a next, valid message might appear in the middle
                // of the payload we just tried to validate
                buffer.remove_prefix(signature.size());
            }
        }
    
        return {};
    }
    
    struct COM {
        asio::thread_pool m_ioc{1};
        asio::serial_port m_port{m_ioc};
        std::atomic_int   m_Data = 0;
    
        std::weak_ptr<void> m_readoperation; // only when read operation active
    
        ~COM() { m_ioc.join(); }
    
        void ReadLoop(std::shared_ptr<std::vector<unsigned char>> pbuf = {}) {
            // g_logger.info("Readloop" + (pbuf ? " with buffer"s : ""));
            if (!pbuf) {
                assert(m_readoperation.expired()); // do not post overlapping read operations
                pbuf          = std::make_shared<std::vector<unsigned char>>();
                m_readoperation = pbuf;
            }
    
            async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
                       [this, pbuf](boost::system::error_code ec, size_t /*length*/) {
                           if (!ec) {
                               try {
                                   m_Data.store(pbuf->size());
    
                                   // Directly process binary data from buffer here.
                                   View remain(*pbuf);
                                   while (auto msg = scan_for_message(remain))
                                       process(*msg);
    
                                   pbuf->erase(pbuf->begin(), pbuf->end() - remain.length());
                                   // Continue reading into the same buffer for new data.
                                   ReadLoop(pbuf);
                               } catch (const std::exception& e) {
                                   g_logger.info("Error processing received data: " + std::string(e.what()));
                               }
                           } else {
                               // Handle read errors.
                               g_logger.info("ReadLoop error: " + ec.message());
                           }
                       });
        }
    
        void process(View data) { // already validated
            fmt::print("Process valid message: {::#02x}\n", std::vector(data.begin(), data.end()));
            for (uint8_t ch : data) {
                switch (ch) {
                case 1: { // Battery level
                          //...
                }
                }
            }
        }
    
        void initialize(bool enable = true) {
            sleep_for(100ms);
    
            m_port.open("COM5");
    
            // Configure serial port settings
            using P = asio::serial_port;
            m_port.set_option(P::baud_rate(115200));
            m_port.set_option(P::character_size(8));
            m_port.set_option(P::parity(P::parity::none));
            m_port.set_option(P::stop_bits(P::stop_bits::one));
            m_port.set_option(P::flow_control(P::flow_control::none));
    
            g_logger.info("Connected");
    
            setEnabled(enable);
        }
    
        void setEnabled(bool enable) {
            if (enable == !m_readoperation.expired())
                return; // nothing to do
    
            if (!enable)
                m_port.cancel(); // cancels read operation (asio::error::operation_aborted)
            else
                ReadLoop();
        }
    
        void terminate() {
            setEnabled(false);
            g_logger.info("Bye!");
        }
    };
    
    // demo code
    int main() {
        std::cout << std::fixed << std::setprecision(2);
        g_logger.info("Demo start");
        COM g_com;
    
        g_com.initialize();
        // g_com.terminate();
    }
    

    With some heavy testing, throwing loads and loads of random binary data at it, which results in "almost valid messages", but of course will rarely pass the checksum test, but a manual test message is accepted:

    enter image description here

    Interestingly, at the end there, it turns out a "valid" message was randomly generated :)

    Process valid message: [0xf5, 0x82, 0x7, 0x94, 0x24, 0x6f, 0x4b, 0x5, 0xf5, 0x56, 0xf3, 0x72, 0x39, 0x3b, 0x97]