c++boost-asio

boost::asio::async_read_until pulls too much from socket


I'm trying to receive messages in the format

<size> <data>

Where:

A sample message is:

19 {"command":"start"}

Here is the class I have to parse it:

class Command : {
public:
    Command(boost::asio::ip::tcp::socket& socket) : _socket(socket) {}
    std::future<boost::property_tree::ptree> Receive();

private:
    void OnSize(const std::error_code&, size_t);
    void OnData(const std::error_code&, size_t);

    boost::asio::ip::tcp::socket&              _socket;
    boost::asio::streambuf                     _buffer;
    std::promise<boost::property_tree::ptree>  _promise;
}

First I need the . I call async_read_until(...,' ',...) to pull data upto (and including) the whitespace. This all gets stored into a boost::asio::streambuf:

std::future<boost::property_tree::ptree> 
Command::Receive() {
    boost::asio::async_read_until(_socket, _buffer, ' ', 
        std::bind(&Command::OnSize, this, _1, _2)
    );
    return _promise.get_future();
}

While this is blocking, I then use netcat to send the sample message:

$ netcat 127.0.0.1 14652
19 {"command":"start"}

If I use wireshark to inspect, that whole message is sent in a single TCP frame.

The handler OnSize() is invoked as expected, with bytes_received = 3. That makes sense because 2 bytes for <size> plus 1 byte for the whitespace. Then I expect that I just need to transfer_exactly(data_size) to read the rest.

void Command::OnSize(const std::error_code& ec, std::size_t bytes_received) {

    std::size_t data_size;
    std::istream is(&_buffer);
    is >> data_size;  // automatically consumes the digits from the buffer
        
    // consume the whitespace
    _buffer.consume(1);
    
    boost::asio::async_read(
        _socket, _buffer, boost::asio::transfer_exactly(data_size),
        std::bind(&Command::OnData, this, _1, _2)
    );
}

Next I call async_read(..., transfer_exactly(data_size), ...), expecting to have OnData() called with bytes_received == data_size. However, I find that it blocks until I trigger another TCP frame of at least data_size by spamming random data, and that's a problem because I'm not expecting more data.

void Command::OnData(const std::error_code& ec, std::size_t bytes_received) {
    std::istream is(&m_buffer);
    boost::property_tree::ptree pt;
    boost::property_tree::json_parser::read_json(is, pt);

    _promise.set_value(pt);
}

What's interesting is that _buffer.size() == 23 at the start of OnSize() and _buffer.size() == 20 right before the async_read. That means the rest of the data has actually already been written into the buffer. What should I do about this?


Solution

  • This is by design:

    This operation is implemented in terms of zero or more calls to the stream's async_read_some function, and is known as a composed operation. If the dynamic buffer sequence's get area already contains the delimiter, this asynchronous operation completes immediately.

    Because the buffer is dynamic, the read will always just get what the underlying IP stack delivers (depending on any fragmentation in the hardware and any accumulation on the sending side (Nagle)).

    It follows that your Command reader should not own the buffer. Buffers are always per-stream. The next Command reader should be able to read any excess data read with the previous command.

    Here's how I might implement what you describe:

    #include <boost/asio.hpp>
    #include <boost/json.hpp>
    #include <iostream>
    #include <syncstream>
    
    namespace asio = boost::asio;
    using tcp      = asio::ip::tcp;
    namespace json = boost::json;
    
    static auto out() { return std::osyncstream(std::cout); }
    
    // text wire protocol
    // receive commands structured like
    //
    //    <length> " " <json>
    //
    // where <length> is the length of the json in bytes
    
    template <typename DynamicBuffer, typename CompletionToken>
    auto async_read_command(tcp::socket& s, DynamicBuffer buf, CompletionToken&& token) {
        using boost::system::error_code;
    
        return asio::async_initiate<CompletionToken, void(error_code, json::value)>(
            [buf](auto handler, auto& s) mutable {
                auto read_length = [&s, buf, handler = std::move(handler)](error_code ec, size_t n) mutable {
                    auto b = asio::buffers_begin(buf.data());
                    // out() << "read length: " << ec.message() << " " << n << " " << quoted(std::string(b, b + buf.size())) << std::endl;
                    if (ec)
                        return handler(ec, json::value{});
    
                    size_t length = 0;
                    try {
                        length = std::stoul(std::string(b, b + n - 1));
                    } catch (...) {
                        return std::move(handler)(error_code{asio::error::invalid_argument}, json::value{});
                    }
                    buf.consume(n);
    
                    auto expectation = [buf, length](error_code const&, size_t) {
                        return buf.size() >= length ? 0 : length - buf.size();
                    };
    
                    asio::async_read( //
                        s, buf, expectation,
                        [buf, handler = std::move(handler), length](error_code ec, size_t /*n*/) mutable {
                            if (ec)
                                return std::move(handler)(ec, json::value{});
    
                            if (buf.size() < length)
                                return std::move(handler)(error_code{asio::error::no_buffer_space}, json::value{});
    
                            auto        b = asio::buffers_begin(buf.data());
                            std::string payload(b, b + length);
                            buf.consume(length);
    
                            auto jv = json::parse(payload);
    
                            std::move(handler)(error_code{}, jv);
                        });
                };
    
                asio::async_read_until(s, buf, ' ', read_length);
            },
            token, s);
    }
    
    int main() {
        asio::thread_pool io(1);
        tcp::acceptor     a(io, {{}, 8989});
    
        for (;;) {
            tcp::socket s = a.accept();
    
            try {
                for (std::string buffer;;) {
                    auto jv = async_read_command(s, asio::dynamic_buffer(buffer), asio::use_future);
                    out() << "Received: " << jv.get() << std::endl;
                }
            } catch (std::exception const& e) {
                out() << "Error: " << e.what() << std::endl;
            }
        }
    }
    

    There are various ways to write composed operations more elegant, especially if C++20 coroutines are an option, but this should work generically. E.g. with asio::use_future as a completion token:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/json.hpp>
    #include <iostream>
    #include <syncstream>
    
    namespace asio = boost::asio;
    using tcp      = asio::ip::tcp;
    namespace json = boost::json;
    
    static auto out() { return std::osyncstream(std::cout); }
    
    // text wire protocol
    // receive commands structured like
    //
    //    <length> " " <json>
    //
    // where <length> is the length of the json in bytes
    
    template <typename DynamicBuffer, typename CompletionToken>
    auto async_read_command(tcp::socket& s, DynamicBuffer buf, CompletionToken&& token) {
        using boost::system::error_code;
    
        return asio::async_initiate<CompletionToken, void(error_code, json::value)>(
            [buf](auto handler, auto& s) mutable {
                auto read_length = [&s, buf, handler = std::move(handler)](error_code ec, size_t n) mutable {
                    auto b = asio::buffers_begin(buf.data());
                    // out() << "read length: " << ec.message() << " " << n << " " << quoted(std::string(b, b + buf.size())) << std::endl;
                    if (ec)
                        return handler(ec, json::value{});
    
                    size_t length = 0;
                    try {
                        length = std::stoul(std::string(b, b + n - 1));
                    } catch (...) {
                        return std::move(handler)(error_code{asio::error::invalid_argument}, json::value{});
                    }
                    buf.consume(n);
    
                    auto expectation = [buf, length](error_code const&, size_t) {
                        return buf.size() >= length ? 0 : length - buf.size();
                    };
    
                    asio::async_read( //
                        s, buf, expectation,
                        [buf, handler = std::move(handler), length](error_code ec, size_t /*n*/) mutable {
                            if (ec)
                                return std::move(handler)(ec, json::value{});
    
                            if (buf.size() < length)
                                return std::move(handler)(error_code{asio::error::no_buffer_space}, json::value{});
    
                            auto        b = asio::buffers_begin(buf.data());
                            std::string payload(b, b + length);
                            buf.consume(length);
    
                            auto jv = json::parse(payload);
    
                            std::move(handler)(error_code{}, jv);
                        });
                };
    
                asio::async_read_until(s, buf, ' ', read_length);
            },
            token, s);
    }
    
    int main() {
        asio::thread_pool io(1);
        tcp::acceptor     a(io, {{}, 8989});
    
        for (;;) {
            tcp::socket s = a.accept();
    
            try {
                for (std::string buffer;;) {
                    auto jv = async_read_command(s, asio::dynamic_buffer(buffer), asio::use_future);
                    out() << "Received: " << jv.get() << std::endl;
                }
            } catch (std::exception const& e) {
                out() << "Error: " << e.what() << std::endl;
            }
        }
    }
    

    Test with e.g.

    nc localhost 8989 <<< '19 {"command":"start"}30 {"command":"start ABCDEFGIJK"}17 {"command":"end"}'
    

    Prints:

    Received: {"command":"start"}
    Received: {"command":"start ABCDEFGIJK"}
    Received: {"command":"end"}
    Received: Error: End of file [asio.misc:2 at /home/sehe/custom/boost/boost/asio/detail/reactive_socket_recv_op.hpp:133:5 in function 'static void boost::asio::detail::reactive_socket_recv_op<MutableBufferSequence, Handler, IoExecutor>::do_complete(void*, boost::asio::detail::operation*, const boost::system::error_code&, std::size_t)']
    

    Uncomment some of the debug trace statements to see that, typically, all three commands arrive in one packet, or two.

    Out Of The Box

    Note I used Boost JSON, because Property Tree is NOT a JSON library. You could use the streaming parser and perhaps avoid the framing at all: boost::asio::async_read_until with custom match_char to accept only JSON format