boost-asioboost-beastbeast-websockets

Boost Async Websocket Server Issue


I've wrote an async websocket thru boost.beast. But when I try to run it, I cannot connect it.

The server code like below. When I try to connect my websocket server, my chrome shows status connecting. When I debug thru VS2017, it never run into the lambda expression in run().


iListener::iListener( boost::asio::io_context& ioc,boost::asio::ip::tcp::endpoint endpoint)  : acceptor_(ioc), socket_(ioc) {

    boost::system::error_code ec;

    std::cout<<"iListener"<<std::endl;
    // Open the acceptor
    acceptor_.open(endpoint.protocol(), ec);
    if (ec) {
        // fail(ec, "open");
        return;
    }

    // Allow address reuse
    acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
    if (ec) {
        // fail(ec, "set_option");
        return;
    }

    // Bind to the server address
    acceptor_.bind(endpoint, ec);
    if (ec) {
        // fail(ec, "bind");
        return;
    }

    // Start listening for connections
    acceptor_.listen(
            boost::asio::socket_base::max_listen_connections, ec);
    if (ec) {

        std::cout << ec.message() << "   listen" << std::endl;
        // fail(ec, "listen");
        return;
    }
}

iListener::~iListener() {

}

void iListener::run() {
    if (!acceptor_.is_open()) {
        return;
    }
    std::cout<<"iListener run"<<std::endl;
    while (true) {
        acceptor_.async_accept(socket_, [&](boost::system::error_code ec1) {
            std::cout << "now run listener" << std::endl;
            if (ec1) {
                std::cout<<ec1.message()<<"   accept"<<std::endl;
                // fail(ec, "accept");
            } else {
                // Create the session and run it
                std::make_shared<NormalSession>(std::move(socket_))->run();
            }
        });
    }


}

void iListener::initListener(const std::string &addressStr, unsigned short port, int threads){
    auto const address = boost::asio::ip::make_address(addressStr);
    boost::asio::io_context ioc{threads};
    std::make_shared<iListener>(ioc, boost::asio::ip::tcp::endpoint{address, port})->run();
    std::vector<std::thread> v;
    v.reserve(threads - 1);
    for(auto i = threads - 1; i > 0; --i)
        v.emplace_back(
                [&ioc]
                {
                    ioc.run();
                });
    ioc.run();
}

And when I try to connect it on Console of Chrome. It taks a long time to connect, and then shows failed.


So I change back, as the example of boost.It works.


void iListener::run() {
    if (!acceptor_.is_open()) {
        return;
    }
   // std::cout<<"iListener run"<<std::endl;
   // while (true) {
   //     acceptor_.async_accept(socket_, [&](boost::system::error_code ec1) {
            //std::cout << "now run listener" << std::endl;
   //         if (ec1) {
   //             std::cout<<ec1.message()<<"   accept"<<std::endl;
   //             // fail(ec, "accept");
   //         } else {
   //             // Create the session and run it
   //             std::make_shared<NormalSession>(std::move(socket_))->run();
   //         }
   //     });
   // }
    do_accept();

}

void iListener::do_accept() {
    acceptor_.async_accept(
        socket_,
        std::bind(
            &iListener::on_accept,
            shared_from_this(),
            std::placeholders::_1));
}

void    iListener::on_accept(boost::system::error_code ec) {
    if (ec)
    {
        std::cout << ec.message() << "   on_accept" << std::endl;
    }
    else
    {
        // Create the session and run it
        std::make_shared<NormalSession>(std::move(socket_))->run();
    }

    // Accept another connection
    do_accept();
}

I have two questions:

1.Why I use lambda, it will SOF , but example won't. 2.When I use while(), it not work, why? Is there any different between lambda expression and std::bind()??


So another question, what's different between two code blocks below?

void iListener::do_accept() {
    acceptor_.async_accept(
        socket_,
        [&](boost::system::error_code ec1) mutable {
        on_accept(ec1);
    }
}


void iListener::do_accept() {
    acceptor_.async_accept(
        socket_,
        std::bind(
            &iListener::on_accept,
            shared_from_this(),
            std::placeholders::_1));
}

When I use the top one, it return error_code 995.


Solution

  • EDIT

    What is the difference between bind and lambda? In the former you prolong lifetime of iListener instance, in the latter you don't.

    We need to start from this line:

    std::make_shared<iListener>(ioc, boost::asio::ip::tcp::endpoint{address, port})->run();
    // [a]
    

    if you don't extend lifetime of iListener in run, in [a] line iListener instance will be destroyed.

    As one of params of bind you are passing shared_from_this, it creates shared_ptr from this pointer, so functor object returned by bind keeps smart pointer to iListener instance prolongs its lifetime.

    lifetime of iListener is not extended, you call async_accept passing lambda without increasing reference counter for the current object i.e. for which do_accept is called. So async_accept returns immediately, do_accept ends, finally run ends too, and object created by std::make_shared<iListener>(ioc, boost::asio::ip::tcp::endpoint{address, port}) is deleted.

    You need to update ref counter by passing shared_ptr by value into your lambda:

    void iListener::do_accept() {
        auto sp = shared_from_this();
        acceptor_.async_accept(
            socket_,
            [&,sp](boost::system::error_code ec1) mutable 
          {
            on_accept(ec1);
          }
    }
    

    Handler (in your case the body of lambda) for a task initiated by async_accept is called from io_context::run, you cannot see this execution because your code hangs on this line:

    std::make_shared<iListener>(ioc, boost::asio::ip::tcp::endpoint{address, port})->run();
    

    this creates iListener instance and calls run which contains the infinite loop and never ends:

    while (true) { // INFINITE LOOP
        acceptor_.async_accept(socket_, [&](boost::system::error_code ec1) {
            std::cout << "now run listener" << std::endl;
            if (ec1) {
                std::cout<<ec1.message()<<"   accept"<<std::endl;
                // fail(ec, "accept");
            } else {
                // Create the session and run it
                std::make_shared<NormalSession>(std::move(socket_))->run();
            }
        });
    }
    

    so you cannot reach the lines that starts io_context::run in which handlers can be called.

    Fix: before starting io_context::run you can start another thread where iListener::run is executed.

    Visit Boost Asio examples to see how async_accept is used. Common way is to call async_accept from its handler, but if you want to do this, your iListener should derive from enable_shared_from_this to prolong its lifetime when passing into handler.


    Another problem is with socket_ data member. I assume you want to hold one socket per session, but now your code doesn't handle it correctly. You have only one instance of socket_ which is moved into NormalSession if a new connection was established. So when async_accept is called second time you are passing INVALID socket. It cannot work. It leads to undefined behaviour.

    After executing the line below

    std::make_shared<NormalSession>(std::move(socket_))->run();
    

    you can forget about socket_.

    async_accept is overloaded, you can use the version taking handler with newly accepted socket.

    But if you want to stay with the current version taking socket, you need to ensure that every time async_accept is called it takes unique instance of socket.