I'm working on a cross-patform UDP async server with Boost.Asio, and got the following assertion error when compiling with MSVC (Visual Studio 2022) :
File: <visual studio path>/include/vector
Line: 54
Expression: can't dereference invalidated vector iterator
When building the same code on Linux with GCC, all is fine.
Here's a minimal example of my server :
#include <chrono>
#include <cstdint>
#include <format>
#include <iostream>
#include <thread>
#include <vector>
#include <sdkddkver.h>
#include <boost/asio.hpp>
namespace asio = boost::asio;
using asio::ip::udp;
using asio::ip::port_type;
class Server {
public:
asio::io_context context{};
const port_type port;
private:
std::array<char, 4'096> receivedData{};
udp::socket socket;
std::mutex mutex{};
std::atomic<bool> isRunning{ true };
std::thread IO_Thread{
[this] {
try {
while (this->isRunning.load()) {
this->context.run();
}
} catch (std::exception& error) {
std::cerr << error.what() << std::endl;
std::exit(1);
}
}
};
void startReceiving() {
static udp::endpoint clientIP{};
this->socket.async_receive_from(
asio::buffer(this->receivedData, 4'096),
clientIP,
[this] (std::error_code error, std::size_t bytesReceived) {
if (error) {
throw std::runtime_error{ std::format("Error while receiving data ({} byte(s)): {}", bytesReceived, error.message()) };
}
std::lock_guard lock{ this->mutex };
const std::vector<std::uint8_t> buf{ this->receivedData.cbegin(), this->receivedData.cbegin() + bytesReceived };
if (buf[0] == 0xFF) {
this->send(clientIP, { 0xFF });
}
this->receivedData.fill(0);
this->startReceiving();
}
);
}
public:
Server(port_type port) :
port{ port },
socket{ this->context, udp::endpoint{ udp::v4(), port } }
{
this->startReceiving();
};
~Server() {
this->isRunning.store(false);
if (this->IO_Thread.joinable()) {
this->IO_Thread.join();
}
}
void send(const udp::endpoint& clientIP, const std::vector<std::uint8_t>& msg) {
this->socket.async_send_to(
asio::buffer(msg),
clientIP,
[this, msg] (std::error_code error, std::size_t bytesSent) {
if (error) {
throw std::runtime_error{ std::format("Error while sending {} byte(s) of data ({} byte(s) effectively sent): {}", msg.size(), bytesSent, error.message()) };
}
}
);
}
};
int main(int argc, char* argv[]) {
Server server{ 5'000 };
std::this_thread::sleep_for(std::chrono::seconds(4));
}
When reducing my actual code, I managed to accurately locate the issue :
// this raises an assertion error
const std::vector<std::uint8_t> buf{ this->receivedData.cbegin(), this->receivedData.cbegin() + bytesReceived };
if (buf[0] == 0xFF) {
this->send(clientIP, { 0xFF });
}
// but this doesn't
if (this->receivedData[0] == 0xFF) {
this->send(clientIP, { 0xFF });
}
I saw that std::vector
isn't thread-safe, but I struggle to understand why it is an issue here, as the vector isn't shared between multiple threads. Also, I don't really understand why I haven't got any issue on Linux.
Here's a CMakeLists.txt extract :
find_package(Boost COMPONENTS asio CONFIG REQUIRED)
if (WIN32)
target_link_libraries(asio_udp PRIVATE wsock32)
target_link_libraries(asio_udp PRIVATE Ws2_32)
endif ()
target_link_libraries(asio_udp PRIVATE Boost::asio) # Boost.Asio is statically linked
target_compile_features(asio_udp PRIVATE cxx_std_20)
...
elseif (${CMAKE_CXX_COMPILER_ID} STREQUAL MSVC)
# Warning C4464 warns about . and .. in #include directive
target_compile_options(asio_udp PRIVATE /W3 /external:anglebrackets /external:W0 /wd4464)
set_property(TARGET asio_udp PROPERTY MSVC_RUNTIME_LIBRARY "MultiThreadedDebug")
else ()
...
In case it matters, this is my reduced client as well :
#include <array>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <exception>
#include <format>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <sdkddkver.h>
#include <boost/asio.hpp>
namespace asio = boost::asio;
using asio::ip::udp;
using asio::ip::port_type;
class Client {
public:
asio::io_context context{};
private:
std::array<char, 4'096> receivedData{};
std::mutex mutex{};
std::thread IO_Thread{
[this] {
try {
while (this->isRunning.load()) {
this->context.run();
}
} catch (std::exception& error) {
std::cerr << error.what() << std::endl;
}
}
};
udp::resolver resolver{ this->context };
udp::socket socket;
udp::endpoint endpoint;
udp::endpoint serverEndpoint;
std::atomic<bool> isRunning{ true };
void startReceiving() {
this->socket.async_receive_from(
asio::buffer(this->receivedData, 4'095),
this->serverEndpoint,
[this] (std::error_code error, std::size_t bytesReceived) {
if (error) {
throw std::runtime_error{ std::format("Error while receiving {} byte(s) of data: {}", bytesReceived, error.message()) };
}
this->startReceiving();
}
);
}
public:
Client(const std::string& serverHostName, port_type serverPort) :
socket{ this->context },
endpoint{},
serverEndpoint{ *this->resolver.resolve(udp::v4(), serverHostName, std::to_string(serverPort)).begin() }
{
this->socket.open(udp::v4());
this->socket.send_to(asio::buffer({ 0xFF }), this->serverEndpoint);
this->startReceiving();
}
~Client() {
this->send({ 0x00 }); // disconnect
this->isRunning.store(false);
if (this->IO_Thread.joinable()) {
this->mutex.lock();
this->context.stop(); // discarding potential pending async tasks
this->mutex.unlock();
this->IO_Thread.join();
}
}
void send(const std::vector<std::uint8_t>& str) {
this->socket.async_receive_from(
asio::buffer(this->receivedData, 4'095),
this->serverEndpoint,
[this] (std::error_code error, std::size_t bytesReceived) {
if (error) {
throw std::runtime_error{ std::format("Error while receiving {} byte(s) of data: {}", bytesReceived, error.message()) };
}
this->startReceiving();
}
);
}
};
int main() {
Client client{ "127.0.0.1", 5'000 };
std::this_thread::sleep_for(std::chrono::seconds(4));
}
There is UB here:
socket.async_send_to(
asio::buffer(msg), clientIP
msg
is a local stack variable. clientIP
refers to the same static variable that is also used for the next async_receive_from
.
The mutex guarding io_context::stop
is useless (io_context is threadsafe).
The mutex around the buffer handling in async_receive_from
completion handler is also questionable: there's only one IO thread, and the completion handler always runs from that.
The send
function on the Client
does... not send.
The receive operation on the Client
overwrites the server endpoint.
The following call does NOT do what you think it does:
asio::buffer({0xFF})
See it for yourself: https://coliru.stacked-crooked.com/a/d73a3dad5a08a31f
Don't "fix" it using {'\xFF'}
- that's still UB (the buffer points to a temporary). Use e.g. asio::buffer("\xFF", 1)
instead.
Instead of mutexes, consider a strand - you have a logical strand since only the IO thread drives the program, main is just waiting
For bonus points, don't aggregate threads or contexts at all. Instead just pass an executor, which lets the user decide on strands/threads/who runs the service.
Adding a minimal amount of logging so we can see the traffic going both ways:
File server.cpp
#include <boost/asio.hpp>
#include <format>
#include <iostream>
#include <fmt/ranges.h>
#include <boost/lexical_cast.hpp>
namespace asio = boost::asio;
using asio::ip::udp;
using asio::ip::port_type;
class Server {
std::array<char, 4'096> incoming_{}, outgoing_{};
udp::socket socket_;
udp::endpoint peer_;
void startReceiving() {
socket_.async_receive_from(asio::buffer(incoming_), peer_, [this](std::error_code error, size_t n) {
if (error) {
std::cerr << std::format("Error receiving ({} byte(s) effectively received): {}", n,
error.message());
} else {
std::string_view buf(incoming_.data(), n); // not copying
fmt::print("Received {::#02x} from {}\n", std::span(buf),
boost::lexical_cast<std::string>(peer_));
if (buf.starts_with(0xFF)) // safer than buf[0] == 0xFF
send(peer_, {0xFF});
startReceiving();
}
});
}
public:
Server(asio::any_io_executor ex, port_type port) //
: socket_{ex, udp::endpoint{udp::v4(), port}} {
startReceiving();
};
void send(udp::endpoint peer, std::vector<uint8_t> const& msg) {
assert(msg.size() < outgoing_.size());
size_t n = std::min(outgoing_.size(), msg.size());
std::copy_n(msg.begin(), n, outgoing_.begin());
socket_.async_send_to(
asio::buffer(outgoing_, n), peer, [n](std::error_code error, size_t sent) {
if (error)
std::cerr << std::format(
"Error while sending {} byte(s) of data ({} byte(s) effectively sent): {}", n,
sent, error.message());
});
}
};
int main() {
asio::io_context ioc;
Server server{ioc.get_executor(), 5'000};
ioc.run_for(std::chrono::seconds(4));
}
File client.cpp
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <fmt/ranges.h>
#include <format>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::port_type;
using asio::ip::udp;
class Client {
std::array<char, 4'096> incoming{}, outgoing{};
udp::socket socket;
udp::resolver resolver{socket.get_executor()};
udp::endpoint peer;
udp::endpoint const server; // not overwriting this
void startReceiving() {
this->socket.async_receive_from(
asio::buffer(incoming), peer, [this](std::error_code error, size_t n) {
if (error) {
std::cerr << std::format("Error while receiving {} byte(s) of data: {}", n,
error.message())
<< std::endl;
// ending the read loop ends the program unless write operation still in flight
} else {
fmt::print("Received {::#02x} from {}\n", std::span(incoming).subspan(0, n),
boost::lexical_cast<std::string>(peer));
startReceiving();
}
});
}
public:
Client(asio::any_io_executor ex, std::string const& serverHostName, port_type serverPort)
: socket{ex, udp::v4()}
, server{*resolver.resolve(udp::v4(), serverHostName, std::to_string(serverPort)).begin()} {
socket.send_to(asio::buffer("\xFF", 1), server);
startReceiving();
}
void shutdown() {
socket.cancel(); // cancel read
send({0x00}); // disconnect
}
void send(std::vector<std::uint8_t> const& str) {
assert(str.size() <= outgoing.size());
auto n = std::min(str.size(), outgoing.size());
std::copy_n(str.begin(), n, outgoing.begin());
socket.async_send_to( //
asio::buffer(outgoing, n), server, [n](std::error_code error, size_t /*sent*/) {
if (error) {
std::cerr << std::format("Error sending {} byte(s) of data: {}", n, error.message())
<< std::endl;
}
});
}
};
int main() {
asio::io_context ioc;
Client client{ioc.get_executor(), "127.0.0.1", 5'000};
ioc.run_for(std::chrono::seconds(2));
client.shutdown();
ioc.run(); // complete shutdown operation
}