c++network-programmingc++17boost-asioasio

async_write only sends after the server is closed


I'm trying to send messages via async_write, but they are only sent after I shutdown the server ( ctrl-c)

For example: as client I send "test" and "test2", and only after closing the server client recieves "testtest2"

I'm making a chat that accepts a message from a user (successfully) and has to broadcast it to everyone

message sending code

void Server::writeHandler(int id, boost::system::error_code error){
    if (!error){
        std::cout << "[DEBUG] message broadcasted ";
    } else {
        close_connection(id);
    }
}
void Server::broadcast(std::string msg, boost::system::error_code error){
    for (auto& user : m_users){ // for every user in unordered map of users
        asio::async_write(user.second->socket, asio::buffer(msg, msg.size()),
            std::bind(&Server::writeHandler, this, user.first, std::placeholders::_1));
    }
}

broadcast calls in onMessage

void Server::onMessage(int id, boost::system::error_code error){
    if (!error){
        broadcast(m_read_msg, error); // char m_read_msg[PACK_SIZE] // PACK_SIZE = 512
        asio::async_read(m_users[id].get()->socket, asio::buffer(m_read_msg, PACK_SIZE), // PACK_SIZE = 512
            std::bind(&Server::onMessage, this, id, std::placeholders::_1));
    } else {
        close_connection(id);
    }
}

server run function:

void Server::run(int port){
    asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
    std::cout << "[DEBUG] binded on " << port << std::endl;

    // acceptor initialization
    m_acceptor = std::make_shared<asio::ip::tcp::acceptor>(m_io, endpoint);
    
    // start to listen for connections
    listen();

    // run the io_service in thread
    io_thread = std::thread( [&]{ m_io.run(); } ); // m_io = io_serivce

    while (true){
    }
}

void Server::listen(){
    std::shared_ptr<User> pUser(new User(m_io));
    m_acceptor->async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
}

Solution

  • Infinite loops are UB in C++.

    Your std::string msg is a local (parameter) and passing it to an async operation (async_write) is also UB.

    To start a thread, immediately followed by an infinite loop is useless, unless the objective is to waste a lot of CPU power. So, replace it with just

     m_io.run();
    

    Next up, the code is far from self contained and lacks all manner of error handling. Here's my imaged minimal completion, fixing the UBs already mentioned:

    #include <boost/asio.hpp>
    #include <iostream>
    #include <map>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::placeholders;
    
    struct User {
        User(asio::io_service& io) : socket(io) {}
    
        tcp::socket socket;
    };
    
    struct Server {
        void run(uint16_t port) {
            m_acceptor = {m_io, {{}, port}};
            std::cout << "[DEBUG] bound on " << port << std::endl;
    
            listen();
    
            m_io.run();
        }
    
        void close_connection(int) {}
        std::map<int, std::shared_ptr<User>> m_users;
    
        void writeHandler(int id, error_code error, std::shared_ptr<void>) {
            if (!error) {
                std::cout << "[DEBUG] message broadcasted ";
            } else {
                close_connection(id);
            }
        }
    
        void broadcast(std::string msg, boost::system::error_code /*error*/) {
            for (auto& [id, usr] : m_users) { // for every user in unordered map of users
                asio::async_write(usr->socket, asio::buffer(*shared_msg),
                                  bind(&Server::writeHandler, this, id, _1, shared_msg));
            }
        }
    
        void listen() {
            std::shared_ptr<User> pUser(new User(m_io));
            m_acceptor.async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
        }
    
        void onAccept(std::shared_ptr<User> /*user*/) { listen(); }
    
        asio::io_service m_io;
        tcp::acceptor    m_acceptor{m_io};
    };
    
    int main() {
        Server s;
        s.run(8989);
    }
    

    Note that nothing ever broadcasts in the first place, so I'm not sure what the problem is.

    What I can see is that

    Here's what it would look like with those addressed:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <map>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::placeholders;
    
    using Message = std::string;
    
    struct UserSession : std::enable_shared_from_this<UserSession> {
        UserSession(tcp::socket s, int /*id*/) : socket_(std::move(s)) {
            std::cout << "New UserSession from " << ep_ << std::endl;
        }
        ~UserSession() {
            std::cout << "Closed UserSession from " << ep_ << std::endl;
        }
    
        void start() {
            read_loop();
        }
    
        void send(Message msg) {
            outbox_.push_back(std::move(msg));
            if (outbox_.size()==1)
                write_loop();
        }
    
      private:
        tcp::socket         socket_;
        tcp::endpoint       ep_; // cache for use after socket_ becomes invalid
        Message             incoming_;
        std::deque<Message> outbox_;
    
        void read_loop() {
            async_read(socket_, asio::dynamic_buffer(incoming_),
                       [this, self = shared_from_this()](error_code ec, size_t) {
                           if (!ec) {
                               // TODO
                               read_loop();
                           }
                       });
        }
    
        void write_loop() { 
            if (outbox_.empty())
                return;
            async_write(socket_, asio::buffer(outbox_.front()),
                        [this, self = shared_from_this()](error_code ec, size_t) {
                            if (!ec) {
                                outbox_.pop_front();
                                write_loop();
                            }
                        });
        }
    };
    
    using Handle = std::weak_ptr<UserSession>;
    
    struct Server {
        explicit Server(asio::any_io_executor ex, uint16_t port) : m_acceptor{ex, {{}, port}} {
            std::cout << "[DEBUG] bound on " << m_acceptor.local_endpoint() << std::endl;
        }
    
        void listen() { accept_loop(); }
    
        void broadcast(Message msg) {
            auto shared_msg = std::make_shared<Message>(std::move(msg));
            for (auto& [id, handle] : m_users) // for every user in unordered map of users
                if (auto usr = handle.lock())
                    usr->send(msg);
        }
    
      private:
        int next_user_id = 0;
        std::map<int, Handle> m_users;
    
        void accept_loop() {
            m_acceptor.async_accept([this](error_code ec, tcp::socket s) {
                if (!ec) {
                    auto id  = next_user_id++;
                    auto usr = std::make_shared<UserSession>(std::move(s), id);
                    m_users.emplace(id, usr);
                    usr->start();
    
                    accept_loop();
                }
            });
        }
    
        tcp::acceptor m_acceptor;
    };
    
    int main() {
        asio::io_context ioc;
    
        Server s(ioc.get_executor(), 8989);
        s.listen();
    
        ioc.run();
    }
    

    Local demo:

    enter image description here

    BONUS: Chat Server?

    Of course, still nothing ever calls broadcast. Assuming you might actually want something like a chat-server:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <map>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code;
    using namespace std::placeholders;
    
    using Id = int;
    
    struct Message {
        Id          sender;
        std::string text;
    };
    
    struct IServer {
        virtual ~IServer() = default;
        virtual void broadcast(Message msg) = 0;
    };
    
    struct UserSession : std::enable_shared_from_this<UserSession> {
        UserSession(tcp::socket s, Id id, IServer& srv) : socket_(std::move(s)), id_(id), svr_(srv) {
            std::cout << "New UserSession from " << ep_ << std::endl;
        }
        ~UserSession() {
            std::cout << "Closed UserSession from " << ep_ << std::endl;
        }
    
        void start() {
            read_loop();
        }
    
        void send(Message msg) {
            outbox_.push_back(std::move(msg));
            if (outbox_.size()==1)
                write_loop();
        }
    
        Id get_id() const { return id_; }
    
      private:
        tcp::socket         socket_;
        Id                  id_;
        IServer&            svr_;
        tcp::endpoint       ep_; // cache for use after socket_ becomes invalid
        Message             incoming_;
        std::deque<Message> outbox_;
    
        void read_loop() {
            incoming_.sender = id_;
            async_read_until( //
                socket_, asio::dynamic_buffer(incoming_.text), "\n",
                [this, self = shared_from_this()](error_code ec, size_t n) {
                    std::cout << ep_ << ": Read " << ec.message() << "\n";
                    if (!ec) {
                        svr_.broadcast(std::move(incoming_));
                        incoming_.text.erase(0, n);
                        read_loop();
                    }
                });
        }
    
        void write_loop() { 
            if (outbox_.empty())
                return;
            async_write(socket_, asio::buffer(outbox_.front().text),
                        [this, self = shared_from_this()](error_code ec, size_t) {
                            if (!ec) {
                                outbox_.pop_front();
                                write_loop();
                            }
                        });
        }
    };
    
    using Handle = std::weak_ptr<UserSession>;
    
    struct Server : IServer {
        explicit Server(asio::any_io_executor ex, uint16_t port) : m_acceptor{ex, {{}, port}} {
            std::cout << "[DEBUG] bound on " << m_acceptor.local_endpoint() << std::endl;
        }
    
        void listen() { accept_loop(); }
    
        virtual void broadcast(Message msg) override {
            m_history.push_back(msg);
            while (m_history.size() > MAX_HIST)
                m_history.pop_front();
    
            for (auto& [id, handle] : m_users) // for every user in unordered map of users
                if (auto usr = handle.lock())
                    if (usr->get_id() != msg.sender)
                        usr->send(msg);
        }
    
      private:
        Id                   next_user_id = 0;
        std::map<Id, Handle> m_users;
        static size_t constexpr MAX_HIST = 100;
        std::deque<Message>  m_history;
    
        void accept_loop() {
            m_acceptor.async_accept([this](error_code ec, tcp::socket s) {
                if (!ec) {
                    erase_if(m_users, [](auto& pair) { return pair.second.expired(); }); // garbage collect
    
                    auto id  = next_user_id++;
                    auto usr = std::make_shared<UserSession>(std::move(s), id, *this);
                    m_users.emplace(id, usr);
    
                    for (auto& msg : m_history)
                        usr->send(msg);
                    broadcast({-1, "** Now " + std::to_string(m_users.size()) + " users online **\n"});
                    usr->start();
    
                    accept_loop();
                }
            });
        }
    
        tcp::acceptor m_acceptor;
    };
    
    int main() {
        asio::io_context ioc;
    
        Server s(ioc.get_executor(), 8989);
        s.listen();
    
        ioc.run();
    }
    

    With a local demo again:

    enter image description here