I have a problem processing packets from the COM port. This port reads at a speed of 115200. A lot of data from this port is lost. The buffer overflows almost immediately. In the code I attach, I try to remove the overprocessed buffer on the fly.
void COM::ReadLoop(std::shared_ptr<std::vector<unsigned char>> pbuf) {
if (!pbuf) {
assert(m_readoperation.expired()); // do not post overlapping read operations
pbuf = std::make_shared<std::vector<unsigned char>>();
m_readoperation = pbuf;
}
asio::async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
[this, pbuf](boost::system::error_code ec, size_t /*length*/) {
if (!ec) {
auto& buffer = *pbuf;
size_t const length = buffer.size();
if (length > 1024) {
g_logger.info("buffer overflow");
buffer.erase(buffer.begin(), buffer.begin() + length - 1024);
}
m_Data.store(length);
size_t i = 0;
while (i < buffer.size() - 2) {
if (buffer[i] == 170 && buffer[i + 1] == 170) {
size_t len = buffer[i + 2];
if (len < 4 || len >= 170 || i + 3 + len > buffer.size()) {
i++; //We move the index to continue searching
continue;
}
std::vector<unsigned char> packetData(buffer.begin() + i + 3, buffer.begin() + i + 3 + len);
parse_packet(packetData);
buffer.erase(buffer.begin(), buffer.begin() + i + 3 + len);
i = 0;
previousSize = buffer.size();
}
else {
i++;
}
if (buffer.size() == previousSize) {
// If the buffer has not been modified, we continue processing
// This prevents infinite looping when there is no more data to process
break;
}
}
ReadLoop(pbuf);
}
else {
g_logger.info("ReadLoop error: " + ec.message());
}
});
}
void COM::parse_packet(const std::vector<unsigned char>& data) {
if (data.size() < 4) // Minimal packet length check
return;
int generated_checksum = 0;
for (size_t i = 0; i < data.size() - 1; ++i) { // Omit the checksum byte in calculations
generated_checksum += data[i];
}
generated_checksum = 255 - (generated_checksum % 256);
int checksum = data.back(); // Assume the last byte is the checksum
if (checksum != generated_checksum) {
return; // Checksum mismatch
}
int i = 0;
while (i < data.size() - 1) {
unsigned char data_type = data[i++];
g_logger.info(std::to_string(data_type));
switch (data_type) {
g_logger.info(std::to_string(data_type));
case 1: {
unsigned char battery_level = data[i++];
m_BatteryLevel.store(battery_level);
break;
}
// here case 2-5
case 0x80: { // Raw data
i += 2; // We ignore the byte length
int raw_data = data[i++] << 8;
raw_data += data[i++];
if (raw_data >= 32768) {
raw_data -= 65536;
}
m_Raw.store(raw_data);
}
case 0x83: { // Power
std::vector<int> g_power;
std::ostringstream oss;
for (int j = 0; j < 8; ++j) {
int power = data[i + 2] + (data[i + 1] << 8) + (data[i] << 16);
if (power > 8388608) {
power -= 16777216;
}
g_power.push_back(power);
i += 3;
}
for (auto power : g_power) {
oss << power << " ";
}
m_PowerG = oss.str();
break;
}
default: {
break;
}
}
}
}
by the way, I have a program written in python that can easily read and process packages.
Few seconds of run program
Initialize port: COM8
Port initialized successfully.
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
128
buffer overflow
128
128
Question How to read data from the buffer so that the buffer does not overflow?
little update
I found the original code. It is written in Delphi. There's some logic here
if l>512 then l:=l1;
if l>512 then begin
ReceivedString := '';
UnparsedRemainingString:='';
CurrentPosition:=0;
l:=0;
end;
UnparsedRemainingString:='';
for i:=1 to l-2 do
if ReceiveByte()=170 then if ReceiveByte()=170 then begin
len:=ReceiveByte();
if len<4 then continue;
if len>=170 then continue;
if CurrentPosition+len>Length(ReceivedString) then begin DropParsing; break; end else ParsePacket(len);
end;
end;
function ReceiveByte() : byte;
begin
Result:=0;
if CurrentPosition>=Length(ReceivedString) then exit;
Result:=byte(ReceivedString[CurrentPosition]);
CurrentPosition:=CurrentPosition+1;
end;
Procedure SkipByte();
begin
CurrentPosition:=CurrentPosition+1;
end;
Procedure DropParsing();
var i: integer;
begin
UnparsedRemainingString:='';
if CurrentPosition>3 then begin
for i:=CurrentPosition-3 to Length(ReceivedString)
do UnparsedRemainingString:=UnparsedRemainingString+ReceivedString[i];
end else UnparsedRemainingString:='';
if length(UnparsedRemainingString)>128 then UnparsedRemainingString:='';
ReceivedString:='';
end;
and this is my actually code in C++
asio::async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
[this, pbuf](boost::system::error_code ec, size_t /*length*/) {
if (!ec) {
auto& buffer = *pbuf;
size_t const length = buffer.size();
// Przygotowanie do ponownego odczytu, jeśli długość bufora przekracza 512 bajtów
m_Data.store(length);
size_t i = 0;
while (i < buffer.size() - 2) {
if (buffer[i] == 170 && buffer[i + 1] == 170) {
size_t len = buffer[i + 2];
if (len < 4 || len >= 170 || i + 3 + len > buffer.size()) {
i++; // Przesuwamy indeks, aby kontynuować przeszukiwanie
continue;
}
std::vector<unsigned char> packetData(buffer.begin() + i + 3, buffer.begin() + i + 3 + len);
parse_packet(packetData);
i += 3 + len;
}
else {
i++;
}
}
if (length > 32768) {
g_logger.info("buffer overflow");
buffer.erase(buffer.begin(), buffer.begin() + length - 32768); // Zachowujemy ostatnie 512 bajtów
}
ReadLoop(pbuf); // Kontynuacja pętli odczytu
}
else {
g_logger.info("ReadLoop error: " + ec.message());
}
});
}
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
I still don't know how to remove buffer. And I'm not sure if the increase should be there
I'm going to interpret the question the way it's likely intended: "how to read continuous stream of logical messages that may arrive in unpredictable transport packets".
I'd probably extract all the logic into handy functions, so you can express your intent with code:
m_Data.store(pbuf->size());
// Directly process binary data from buffer here.
View remain(*pbuf);
while (auto msg = scan_for_message(remain))
process(*msg);
pbuf->erase(pbuf->begin(), pbuf->end() - remain.length());
// Continue reading into the same buffer for new data.
ReadLoop(pbuf);
The helpers can be:
using View = std::basic_string_view<uint8_t>;
constexpr uint8_t calc_checksum(View data) {
unsigned cksum = 0;
for (uint8_t b : data)
cksum += b;
return 255 - (cksum % 256);
}
That gives us an easy way to validate message payload:
constexpr bool validate_message_payload(View payload) // signature and length not included
{
// size
if (payload.size() < 4)
return false; // Minimal packet length
// checksum
{
uint8_t const expected = payload.back();
payload.remove_suffix(1); // Exclude checksum byte
uint8_t actual = calc_checksum(payload);
g_logger.info("Checksum expected: " + std::to_string(expected) +
", actual: " + std::to_string(actual));
if (expected != actual) {
g_logger.info("Checksum failed");
return false;
}
}
return true;
}
And now we can implement scan_for_message
:
// Finds the first *valid* message, discarding preceding data.
// Returns the payload data span or nullopt if none found.
constexpr std::optional<View> scan_for_message(View& buffer) {
while (!buffer.empty()) {
// Find signature
static constexpr std::array<uint8_t, 2> signature{0xaa, 0xaa};
{
View constexpr sigspan(signature.data(), signature.size());
auto pos = buffer.find(sigspan);
if (pos == View::npos) {
buffer.remove_prefix(buffer.length()); // none of the buffer is relevant
return {};
}
buffer.remove_prefix(pos); // skip noise/partial message debris
}
auto candidate = buffer.substr(signature.size());
// Get length
if (candidate.empty())
return {}; // incomplete
unsigned payload_len = candidate.front();
if (candidate.length() < payload_len + 1)
return {}; // incomplete
View payload = candidate.substr(1, payload_len); // skips length byte
if (validate_message_payload(payload)) {
// remove from buffer, and return payload
buffer.remove_prefix(signature.size() + 1 + payload_len);
auto data = payload.substr(0, payload.length() - 1); // throw away checksum
return data;
} else {
// Not valid.
// Note: we don't skip the entire payload, just the current header signature, because a valid signature might appear in the middle
// of the payload just tried to validate
buffer.remove_prefix(signature.size());
}
}
return {};
}
The whole point of this is to responsibly consume buffer contents, while preserving potential incomplete messages.
#include <boost/asio.hpp>
#include <fmt/ranges.h>
#include <fmt/printf.h>
#include <iostream>
using namespace std::chrono_literals;
using namespace std::string_literals;
namespace asio = boost::asio;
using std::this_thread::sleep_for;
// mocking question code
struct {
void info(std::string const& msg) const {
static auto const start = std::chrono::steady_clock::now();
auto ts = (std::chrono::steady_clock::now() - start);
if (ts > 1s)
std::cout << std::setw(6) << ts / 1.s << "s " << msg << std::endl;
else
std::cout << std::setw(6) << ts / 1ms << "ms " << msg << std::endl;
}
} static g_logger;
// end mocks
using View = std::basic_string_view<uint8_t>;
constexpr uint8_t calc_checksum(View data) {
unsigned cksum = 0;
for (uint8_t b : data)
cksum += b;
return 255 - (cksum % 256);
}
constexpr bool validate_message_payload(View payload) // signature and length not included
{
// size
if (payload.size() < 4)
return false; // Minimal packet length
// checksum
{
uint8_t const expected = payload.back();
payload.remove_suffix(1); // Exclude checksum byte
uint8_t actual = calc_checksum(payload);
g_logger.info("Checksum expected: " + std::to_string(expected) +
", actual: " + std::to_string(actual));
if (expected != actual) {
g_logger.info("Checksum failed");
return false;
}
}
return true;
}
// Finds the first *valid* message, discarding preceding data.
// Returns the payload data span or nullopt if none found.
constexpr std::optional<View> scan_for_message(View& buffer) {
while (!buffer.empty()) {
// Find signature
static constexpr std::array<uint8_t, 2> signature{0xaa, 0xaa};
{
View constexpr sigspan(signature.data(), signature.size());
auto pos = buffer.find(sigspan);
if (pos == View::npos) {
buffer.remove_prefix(buffer.length()); // none of the buffer is relevant
return {};
}
buffer.remove_prefix(pos); // skip noise/partial message debris
}
auto candidate = buffer.substr(signature.size());
// Get length
if (candidate.empty())
return {}; // incomplete
unsigned payload_len = candidate.front();
if (candidate.length() < payload_len + 1)
return {}; // incomplete
View payload = candidate.substr(1, payload_len); // skips length byte
if (validate_message_payload(payload)) {
// remove from buffer, and return payload
buffer.remove_prefix(signature.size() + 1 + payload_len);
auto data = payload.substr(0, payload.length() - 1); // throw away checksum
return data;
} else {
// Not valid.
// Note: we don't skip the entire payload, just the current header signature, because it might be
// valid part of a previous partial message, and a next, valid message might appear in the middle
// of the payload we just tried to validate
buffer.remove_prefix(signature.size());
}
}
return {};
}
struct COM {
asio::thread_pool m_ioc{1};
asio::serial_port m_port{m_ioc};
std::atomic_int m_Data = 0;
std::weak_ptr<void> m_readoperation; // only when read operation active
~COM() { m_ioc.join(); }
void ReadLoop(std::shared_ptr<std::vector<unsigned char>> pbuf = {}) {
// g_logger.info("Readloop" + (pbuf ? " with buffer"s : ""));
if (!pbuf) {
assert(m_readoperation.expired()); // do not post overlapping read operations
pbuf = std::make_shared<std::vector<unsigned char>>();
m_readoperation = pbuf;
}
async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
[this, pbuf](boost::system::error_code ec, size_t /*length*/) {
if (!ec) {
try {
m_Data.store(pbuf->size());
// Directly process binary data from buffer here.
View remain(*pbuf);
while (auto msg = scan_for_message(remain))
process(*msg);
pbuf->erase(pbuf->begin(), pbuf->end() - remain.length());
// Continue reading into the same buffer for new data.
ReadLoop(pbuf);
} catch (const std::exception& e) {
g_logger.info("Error processing received data: " + std::string(e.what()));
}
} else {
// Handle read errors.
g_logger.info("ReadLoop error: " + ec.message());
}
});
}
void process(View data) { // already validated
fmt::print("Process valid message: {::#02x}\n", std::vector(data.begin(), data.end()));
for (uint8_t ch : data) {
switch (ch) {
case 1: { // Battery level
//...
}
}
}
}
void initialize(bool enable = true) {
sleep_for(100ms);
m_port.open("COM5");
// Configure serial port settings
using P = asio::serial_port;
m_port.set_option(P::baud_rate(115200));
m_port.set_option(P::character_size(8));
m_port.set_option(P::parity(P::parity::none));
m_port.set_option(P::stop_bits(P::stop_bits::one));
m_port.set_option(P::flow_control(P::flow_control::none));
g_logger.info("Connected");
setEnabled(enable);
}
void setEnabled(bool enable) {
if (enable == !m_readoperation.expired())
return; // nothing to do
if (!enable)
m_port.cancel(); // cancels read operation (asio::error::operation_aborted)
else
ReadLoop();
}
void terminate() {
setEnabled(false);
g_logger.info("Bye!");
}
};
// demo code
int main() {
std::cout << std::fixed << std::setprecision(2);
g_logger.info("Demo start");
COM g_com;
g_com.initialize();
// g_com.terminate();
}
With some heavy testing, throwing loads and loads of random binary data at it, which results in "almost valid messages", but of course will rarely pass the checksum test, but a manual test message is accepted:
Interestingly, at the end there, it turns out a "valid" message was randomly generated :)
Process valid message: [0xf5, 0x82, 0x7, 0x94, 0x24, 0x6f, 0x4b, 0x5, 0xf5, 0x56, 0xf3, 0x72, 0x39, 0x3b, 0x97]