c++boost-asio

UDP Client using Boost does not always receive packets, though i can see them in wireshark


Thanks to whoever takes their time to help me with this. I'm writing a small program to operate a multirotor aircraft via mavlink messages. The drone with its autopilot broadcasts telemetry data via UDP and I, as a client, must listen to its messages and send some of my own. The problem i have is that, about half the time, my program does not appear to receive any packets from the drone. I know for sure that the problem is with my code, as i can see all the messages sent by the autopilot via UDP using wireshark. Other software work without issues with the same autopilot on the same computer. The issue is somewhere in my app.

I have a BaseUDP class that manages all of the UDP communication.

class UDPBase
{
public:
    using IoContext = boost::asio::io_service; // backwards compatibility forces me to use an older boost
    using UDP = boost::asio::ip::udp;
    using Socket = UDP::socket;
    using Endpoint = UDP::endpoint;
    using Address = boost::asio::ip::address;
    using ErrorCode = boost::system::error_code;

public:
    UDPBase(IoContext &ioContext);
    UDPBase(IoContext &ioContext, unsigned short int listenPort);
    virtual ~UDPBase();

    virtual void startListening();

    void startIoContext();
    void stopIoContext();

protected:
    void init();
    virtual void close();
    void doListen();
    virtual void onDataReceived(const ErrorCode &ec, unsigned char *buffer, size_t bufferSize);

private:

    IoContext &ioContext;
    Socket socket;
    Endpoint listeningEndpoint;
    Endpoint destinationEndpoint;
};

listeningEndpoint is configured with the port i listen on. I redacted some setters and getters to reduce the size of the code.

UDPBase::UDPBase(IoContext &ioContext) : ioContext(ioContext), socket(ioContext)
{
    init();
}

UDPBase::UDPBase(IoContext &ioContext, unsigned short int listenPort) : ioContext(ioContext), socket(ioContext)
{
// set listenEndpoint based on the port
   init();
}

UDPBase::~UDPBase()
{
    // close();
}

void UDPBase::startListening(unsigned short int listenPort)
{
    init();
    doListen();
}

void UDPBase::close()
{
    socket.close();
}

void UDPBase::startIoContext()
{
    ioContext.run();
}

void UDPBase::stopIoContext()
{
    ioContext.post([this]()
    {
        if (socket.is_open())
        {
            socket.close();
        } 
    });

    ioContext.stop();
}

void UDPBase::doListen()
{
    socket.async_receive_from(
        boost::asio::buffer(receiveBuffer, maxBufferSize), listeningEndpoint,
        [this](ErrorCode ec, std::size_t bytesRecvd)
        {
            // std::cout << "Received from IP: " << listeningEndpoint.address().to_string() << " and Port: " << listeningEndpoint.port() << std::endl;
            handleListen(ec);
            onDataReceived(ec, receiveBuffer.data(), bytesRecvd);
            doListen();
        });
}

void UDPBase::onDataReceived(const ErrorCode &ec, unsigned char *buffer, size_t bufferSize)
{
    if (ec)
    {
        std::cout << "! Receive error: " << ec.message() << std::endl;
        return;
    }

    if (bufferSize)
    {
        // decode buffer
    }
    else
    {
        std::cout << "# No data received" << std::endl;
    }
}

void UDPBase::init()
{
    close();
    socket.open(UDP::v4());
    boost::asio::socket_base::reuse_address option(true);
    socket.set_option(option);
    boost::system::error_code ec;
    socket.bind(listeningEndpoint, ec);
    if (ec)
    {
        std::cerr << "Failed to bind socket: " << ec.message() << std::endl;
        throw std::runtime_error("Socket bind failed");
    }
}

I inherit UDPBase in another class which handles message decoding and starts a thread running the ioContext.

void MavlinkWrapper::start()
{
    mAppThread = std::thread([this]()
                             { startIoContext(); });
}

void MavlinkWrapper::stop()
{
    stopIoContext();
    if (mAppThread.joinable()) 
    {
        mAppThread.join();
    }
}

main.cpp

#include "MavlinkWrapper/MavlinkWrapper.hpp"

int main()
{
    boost::asio::io_service ioContext;
    MavlinkWrapper mavlinkWrapper(ioContext, 14550); // listen port is forwarded to UDPBase to bind the socket to it

    mavlinkWrapper.start();
    std::this_thread::sleep_for(std::chrono::seconds(20)); // wait for the messages that the autopilot broadcasts and print some data
    mavlinkWrapper.stop();

    return 0;
}

I suspect there might be a flaw in the way i either close the socket or join the thread at the end. Or maybe i am not correctly stopping the ioContext and the socket is left in a weird state. I am using windows 10.


Solution

  • Making your code self-contained: https://coliru.stacked-crooked.com/a/d57170579882d1cf

    1. Your startIoContext

      void UDPBase::startIoContext() { ioContext.run(); }
      

      may call run() before any work has been posted. This causes the thread to immediately exit. So let's add startListening BEFORE the thread starts:

      void start() {
           startListening(listenPort);
           mAppThread = std::jthread([this]() { startIoContext(); });
      }
      

      Shuffling things around because listenPort isn't available there.

      Now, of course, init() in the constructor becomes entirely redundant and even potentially conflicting (unless the ports supplied are the same)

    2. your async_receive_from passes listeningEndpoint, which overwrites that...

    Shuffling more things around makes it work for me:

    Live On Coliru

    But I'd probably simplify a lot. E.g. by realizing io_service is thread-safe.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <thread>
    using UDP       = boost::asio::ip::udp;
    using Endpoint  = UDP::endpoint;
    using ErrorCode = boost::system::error_code;
    
    struct MavlinkWrapper {
        MavlinkWrapper(uint16_t listenPort);
    
        using Handler = std::function<void(ErrorCode const&, unsigned char*, size_t, Endpoint)>;
        Handler onDataReceived{};
    
        void stop() { ioc_.stop(); }
      private:
        void receiveLoop();
    
        boost::asio::io_service ioc_; // backwards compatibility forces me to use an older boost
        UDP::socket             socket_{ioc_, UDP::v4()};
        Endpoint                listenEp_, remote_, destEp_;
        std::jthread            thread_;
    
        std::array<unsigned char, 1024> receiveBuffer;
    };
    
    MavlinkWrapper::MavlinkWrapper(uint16_t listenPort) : listenEp_{{}, listenPort} {
        socket_.set_option(UDP::socket::reuse_address(true));
    
        socket_.bind(listenEp_); // throws system_error if bind fails
        receiveLoop();
    
        thread_ = std::jthread([this]() { ioc_.run(); });
    }
    
    void MavlinkWrapper::receiveLoop() {
        socket_.async_receive_from(boost::asio::buffer(receiveBuffer), remote_,
                                   [this](ErrorCode ec, std::size_t bytesRecvd) {
                                       if (onDataReceived)
                                           onDataReceived(ec, receiveBuffer.data(), bytesRecvd, remote_);
                                       receiveLoop();
                                   });
    }
    
    #include <print>
    using namespace std::chrono_literals;
    
    int main() try {
        MavlinkWrapper w{14550};
        w.onDataReceived = [](ErrorCode const& ec, unsigned char* buffer, size_t bufferSize, Endpoint ep) {
            if (ec) {
                std::cout << "onDataReceived ! error: " << ec.message() << std::endl;
                return;
            }
    
            if (bufferSize) {
                auto msg = std::string(reinterpret_cast<char const*>(buffer), bufferSize);
                while (msg.ends_with("\n"))
                    msg.pop_back();
    
                std::println("onDataReceived {} bytes from {}:{} {}", //
                             bufferSize,                              //
                             ep.address().to_string(),                //
                             ep.port(),                               //
                             msg);
            } else {
                std::cout << "# No data received" << std::endl;
            }
        };
    
        // wait for the messages that the autopilot broadcasts and print some data
        std::this_thread::sleep_for(20s);
        w.stop();
    } catch (std::exception const& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    

    Only half the code and far fewer complications.