Thanks to whoever takes their time to help me with this. I'm writing a small program to operate a multirotor aircraft via mavlink messages. The drone with its autopilot broadcasts telemetry data via UDP and I, as a client, must listen to its messages and send some of my own. The problem i have is that, about half the time, my program does not appear to receive any packets from the drone. I know for sure that the problem is with my code, as i can see all the messages sent by the autopilot via UDP using wireshark. Other software work without issues with the same autopilot on the same computer. The issue is somewhere in my app.
I have a BaseUDP class that manages all of the UDP communication.
class UDPBase
{
public:
using IoContext = boost::asio::io_service; // backwards compatibility forces me to use an older boost
using UDP = boost::asio::ip::udp;
using Socket = UDP::socket;
using Endpoint = UDP::endpoint;
using Address = boost::asio::ip::address;
using ErrorCode = boost::system::error_code;
public:
UDPBase(IoContext &ioContext);
UDPBase(IoContext &ioContext, unsigned short int listenPort);
virtual ~UDPBase();
virtual void startListening();
void startIoContext();
void stopIoContext();
protected:
void init();
virtual void close();
void doListen();
virtual void onDataReceived(const ErrorCode &ec, unsigned char *buffer, size_t bufferSize);
private:
IoContext &ioContext;
Socket socket;
Endpoint listeningEndpoint;
Endpoint destinationEndpoint;
};
listeningEndpoint
is configured with the port i listen on. I redacted some setters and getters to reduce the size of the code.
UDPBase::UDPBase(IoContext &ioContext) : ioContext(ioContext), socket(ioContext)
{
init();
}
UDPBase::UDPBase(IoContext &ioContext, unsigned short int listenPort) : ioContext(ioContext), socket(ioContext)
{
// set listenEndpoint based on the port
init();
}
UDPBase::~UDPBase()
{
// close();
}
void UDPBase::startListening(unsigned short int listenPort)
{
init();
doListen();
}
void UDPBase::close()
{
socket.close();
}
void UDPBase::startIoContext()
{
ioContext.run();
}
void UDPBase::stopIoContext()
{
ioContext.post([this]()
{
if (socket.is_open())
{
socket.close();
}
});
ioContext.stop();
}
void UDPBase::doListen()
{
socket.async_receive_from(
boost::asio::buffer(receiveBuffer, maxBufferSize), listeningEndpoint,
[this](ErrorCode ec, std::size_t bytesRecvd)
{
// std::cout << "Received from IP: " << listeningEndpoint.address().to_string() << " and Port: " << listeningEndpoint.port() << std::endl;
handleListen(ec);
onDataReceived(ec, receiveBuffer.data(), bytesRecvd);
doListen();
});
}
void UDPBase::onDataReceived(const ErrorCode &ec, unsigned char *buffer, size_t bufferSize)
{
if (ec)
{
std::cout << "! Receive error: " << ec.message() << std::endl;
return;
}
if (bufferSize)
{
// decode buffer
}
else
{
std::cout << "# No data received" << std::endl;
}
}
void UDPBase::init()
{
close();
socket.open(UDP::v4());
boost::asio::socket_base::reuse_address option(true);
socket.set_option(option);
boost::system::error_code ec;
socket.bind(listeningEndpoint, ec);
if (ec)
{
std::cerr << "Failed to bind socket: " << ec.message() << std::endl;
throw std::runtime_error("Socket bind failed");
}
}
I inherit UDPBase in another class which handles message decoding and starts a thread running the ioContext.
void MavlinkWrapper::start()
{
mAppThread = std::thread([this]()
{ startIoContext(); });
}
void MavlinkWrapper::stop()
{
stopIoContext();
if (mAppThread.joinable())
{
mAppThread.join();
}
}
main.cpp
#include "MavlinkWrapper/MavlinkWrapper.hpp"
int main()
{
boost::asio::io_service ioContext;
MavlinkWrapper mavlinkWrapper(ioContext, 14550); // listen port is forwarded to UDPBase to bind the socket to it
mavlinkWrapper.start();
std::this_thread::sleep_for(std::chrono::seconds(20)); // wait for the messages that the autopilot broadcasts and print some data
mavlinkWrapper.stop();
return 0;
}
I suspect there might be a flaw in the way i either close the socket or join the thread at the end. Or maybe i am not correctly stopping the ioContext and the socket is left in a weird state. I am using windows 10.
Making your code self-contained: https://coliru.stacked-crooked.com/a/d57170579882d1cf
Your startIoContext
void UDPBase::startIoContext() { ioContext.run(); }
may call run()
before any work has been posted. This causes the thread to immediately exit. So let's add startListening
BEFORE the thread starts:
void start() {
startListening(listenPort);
mAppThread = std::jthread([this]() { startIoContext(); });
}
Shuffling things around because listenPort
isn't available there.
Now, of course,
init()
in the constructor becomes entirely redundant and even potentially conflicting (unless the ports supplied are the same)
your async_receive_from
passes listeningEndpoint
, which overwrites that...
Shuffling more things around makes it work for me:
But I'd probably simplify a lot. E.g. by realizing io_service
is thread-safe.
#include <boost/asio.hpp>
#include <iostream>
#include <thread>
using UDP = boost::asio::ip::udp;
using Endpoint = UDP::endpoint;
using ErrorCode = boost::system::error_code;
struct MavlinkWrapper {
MavlinkWrapper(uint16_t listenPort);
using Handler = std::function<void(ErrorCode const&, unsigned char*, size_t, Endpoint)>;
Handler onDataReceived{};
void stop() { ioc_.stop(); }
private:
void receiveLoop();
boost::asio::io_service ioc_; // backwards compatibility forces me to use an older boost
UDP::socket socket_{ioc_, UDP::v4()};
Endpoint listenEp_, remote_, destEp_;
std::jthread thread_;
std::array<unsigned char, 1024> receiveBuffer;
};
MavlinkWrapper::MavlinkWrapper(uint16_t listenPort) : listenEp_{{}, listenPort} {
socket_.set_option(UDP::socket::reuse_address(true));
socket_.bind(listenEp_); // throws system_error if bind fails
receiveLoop();
thread_ = std::jthread([this]() { ioc_.run(); });
}
void MavlinkWrapper::receiveLoop() {
socket_.async_receive_from(boost::asio::buffer(receiveBuffer), remote_,
[this](ErrorCode ec, std::size_t bytesRecvd) {
if (onDataReceived)
onDataReceived(ec, receiveBuffer.data(), bytesRecvd, remote_);
receiveLoop();
});
}
#include <print>
using namespace std::chrono_literals;
int main() try {
MavlinkWrapper w{14550};
w.onDataReceived = [](ErrorCode const& ec, unsigned char* buffer, size_t bufferSize, Endpoint ep) {
if (ec) {
std::cout << "onDataReceived ! error: " << ec.message() << std::endl;
return;
}
if (bufferSize) {
auto msg = std::string(reinterpret_cast<char const*>(buffer), bufferSize);
while (msg.ends_with("\n"))
msg.pop_back();
std::println("onDataReceived {} bytes from {}:{} {}", //
bufferSize, //
ep.address().to_string(), //
ep.port(), //
msg);
} else {
std::cout << "# No data received" << std::endl;
}
};
// wait for the messages that the autopilot broadcasts and print some data
std::this_thread::sleep_for(20s);
w.stop();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
Only half the code and far fewer complications.