c++boost-asioshared-ptrdestructorboost-beast

Clarification on Properly Shutting Down a Boost Beast Server


I'm working with the Boost Beast advanced server example (available at Boost.org) and am using it as a basis for my own server. My understanding is that in this example, both the session and the listener are self-sustaining objects; they keep themselves alive by always attaching a shared_ptr of themselves to a callback handler. This design seems very elegant and practical as it appears to simplify error handling by allowing the structure to self-destruct when there are no pending asynchronous operations.

However, I'm unclear about how the entire system is shutdown. The code first calls ioc.stop() and then joins on the corresponding threads. I initially thought ioc.stop() would handle the cleanup of any pending handlers and associated shared_ptrs, but that does not seem to be the case. It seems more likely that any remaining handlers are destroyed by the final destruction of ioc itself, thereby cleaning up any remaining shared_ptrs.

In my server implementation, I've handled it by using a unique_ptr for the io_context and explicitly resetting it in the destructor of my server:

HTTPServer::HTTPServerImpl::~HTTPServerImpl()
{
    m_ioc->stop();

    for (auto& thread : m_iocThreads)
        thread.join();

    m_ioc.reset(); // reset the unique_ptr
}

I have a couple of questions regarding this shutdown process:

1.) Is the shutdown procedure in ~HTTPServerImpl, as adopted from the example, considered "clean," in terms of properly closing all sockets? How exactly does this happen? Through the destructors of beast::tcp_stream and tcp::acceptor when the shared_ptrs destruct? Or should I explicitly call cancel()/close() before stop() through some weak_ptr (Is this cancel/close allowed from the main thread? Is it thread-safe?)? Are all asynchronous operations aborted during m_ioc.reset(), or are some handlers still executed (e.g. timers)?

2.) What exactly does stop() do in this context? Does it simply prevent new handlers from being added, or is there more to it than that? The docu says "abandoning unfinished operations", what does that mean exactly? I can't see that this will stop anything. No timers are aborted prematurely either.


Solution

  • It seems more likely that any remaining handlers are destroyed by the final destruction of ioc itself, thereby cleaning up any remaining shared_ptrs.

    Indeed. stop merely "stops the world" but doesn't "get rid of history".

    The destructor does.

    Q1. Is the shutdown procedure in ~HTTPServerImpl, as adopted from the example, considered "clean," in terms of properly closing all sockets?

    Yes. It closes all sockets implicitly because they're only kept alive by the reference count in any pending async operation (shared_from_this).

    Q2. What exactly does stop() do in this context? Does it simply prevent new handlers from being added, or is there more to it than that?

    It stops the scheduler. Frankly, that's an implementation detail. It functionally prevents any handler from being run and breaks any running event loops (e.g. io_context.run()).

    You ask a lot of questions along the way, but I think it all boils down to these:

    Or should I explicitly call cancel()/close() before stop()

    I would, yes. Keep in mind you take into account some kind of timeout to avoid situations where the shutdown deadlocks itself. This would be "defense against byzantine failure" (https://en.wikipedia.org/wiki/Byzantine_fault)

    through some weak_ptr

    That's how I usually do it. In fact you can probably search my answers using weak_ptr, cancel, session as keywords to find examples.

    Is this cancel/close allowed from the main thread?

    Not as long as other threads may access the IO objects.

    Is it thread-safe?

    Cancel has no exceptional safety guarantees, so the usual applies (e.g. https://www.boost.org/doc/libs/1_85_0/doc/html/boost_asio/reference/ip__tcp/socket.html#boost_asio.reference.ip__tcp.socket.thread_safety)

    Modernize

    Here's how I'd probably write the same:

    Live On Coliru

    #define BOOST_ASIO_NO_DEPRECATED
    #include <boost/algorithm/string.hpp>
    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <iostream>
    #include <map>
    #include <optional>
    #include <queue>
    namespace beast     = boost::beast;     // from <boost/beast.hpp>
    namespace http      = beast::http;      // from <boost/beast/http.hpp>
    namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
    namespace net       = boost::asio;      // from <boost/asio.hpp>
    using beast::string_view;
    using boost::asio::ip::tcp;
    
    namespace mime { // see https://stackoverflow.com/a/48235134/85371
        struct CiCmp {
            template <typename R1, typename R2> bool operator()(R1 const& a, R2 const& b) const {
                return boost::algorithm::ilexicographical_compare(a, b);
            }
        };
    
        static std::map<string_view, string_view, CiCmp> const s_ext_map{
            {".txt",  "text/plain"},
            {".htm",  "text/html"},
            {".html", "text/html"},
            {".php",  "text/html"},
            {".css",  "text/css"},
            {".js",   "application/javascript"},
            {".json", "application/json"},
            {".xml",  "application/xml"},
            {".swf",  "application/x-shockwave-flash"},
            {".flv",  "video/x-flv"},
            {".png",  "image/png"},
            {".jpe",  "image/jpeg"},
            {".jpeg", "image/jpeg"},
            {".jpg",  "image/jpeg"},
            {".gif",  "image/gif"},
            {".bmp",  "image/bmp"},
            {".ico",  "image/vnd.microsoft.icon"},
            {".tif",  "image/tiff"},
            {".tiff", "image/tiff"},
            {".svg",  "image/svg+xml"},
            {".svgz", "image/svg+xml"},
        };
    } // namespace mime
    
    static string_view mime_type(string_view n_path) {
        auto const ext = [](string_view n_path) {
            auto pos = n_path.rfind(".");
            return pos != string_view::npos ? n_path.substr(pos) : string_view{};
        };
    
        if (auto i = mime::s_ext_map.find(ext(n_path)); i != mime::s_ext_map.cend()) {
            return i->second;
        } else {
            return "application/text";
        }
    }
    
    // return path normalized for the platform
    std::string path_cat(string_view base, string_view path) {
        if (base.empty())
            return std::string(path);
        std::string result(base);
    #if defined(_WIN32) || defined(__CYGWIN__)
        char constexpr path_separator = '\\';
        if (result.back() == path_separator)
            result.resize(result.size() - 1);
        result.append(path.data(), path.size());
        for (auto& c : result)
            if (c == '/')
                c = path_separator;
    #else
        char constexpr path_separator = '/';
        if (result.back() == path_separator)
            result.resize(result.size() - 1);
        result.append(path.data(), path.size());
    #endif
        return result;
    }
    
    // Return a response for the given request.
    template <class Body, class Allocator>
    http::message_generator //
    handle_request(string_view doc_root, http::request<Body, http::basic_fields<Allocator>>&& req) {
        auto const error = [&req](http::status code, std::string body) {
            http::response<http::string_body> res{code, req.version(), std::move(body)};
            res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
            res.set(http::field::content_type, "text/html");
            res.keep_alive(req.keep_alive());
            res.prepare_payload();
            return res;
        };
    
        if (req.method() != http::verb::get && req.method() != http::verb::head)
            return error(http::status::bad_request, "Unknown HTTP-method");
    
        // Request path must be absolute and not contain "..".
        if (req.target().empty() || req.target()[0] != '/' || req.target().find("..") != string_view::npos)
            return error(http::status::bad_request, "Illegal request-target");
    
        std::string path = path_cat(doc_root, req.target());
        if (req.target().back() == '/')
            path.append("index.html");
    
        // Attempt to open the file
        beast::error_code           ec;
        http::file_body::value_type body;
        body.open(path.c_str(), beast::file_mode::scan, ec);
    
        if (ec == beast::errc::no_such_file_or_directory)
            return error(http::status::not_found,
                         "The resource '" + std::string(req.target()) + "' was not found.");
        else if (ec)
            return error(http::status::internal_server_error, ec.message());
    
        if (req.method() == http::verb::head) {
            // Respond to HEAD request
            http::response<http::empty_body> res{http::status::ok, req.version()};
            res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
            res.set(http::field::content_type, mime_type(path));
            res.content_length(body.size());
            res.keep_alive(req.keep_alive());
            return res;
        } else {
            // Respond to GET request
            auto const size = body.size(); // we need it after the move
    
            http::response<http::file_body> res{std::piecewise_construct, std::make_tuple(std::move(body)),
                                                std::make_tuple(http::status::ok, req.version())};
            res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
            res.set(http::field::content_type, mime_type(path));
            res.content_length(size);
            res.keep_alive(req.keep_alive());
            return res;
        }
    }
    
    void fail(beast::error_code ec, char const* what) { std::cerr << what << ": " << ec.message() << "\n"; }
    
    // Echo session
    class websocket_session : public std::enable_shared_from_this<websocket_session> {
        websocket::stream<beast::tcp_stream>      ws_;
        tcp::endpoint                             ep_ = beast::get_lowest_layer(ws_).socket().remote_endpoint();
        beast::flat_buffer                        buffer_;
        std::stop_token                           token_;
        std::stop_callback<std::function<void()>> stop_ //
            {token_, [this] {
                 net::post(ws_.get_executor(), [this] {
                     ws_.next_layer().cancel(); // or do_close?
                 });
                 ;
             }};
    
      public:
        explicit websocket_session(tcp::socket&& socket, std::stop_token token)
            : ws_(std::move(socket))
            , token_(std::move(token)) {
            std::cerr << "Websocket upgrade (" << ep_ << ")" << std::endl;
        }
    
        ~websocket_session() {
            std::cerr << "Websocket session destroyed (" << ep_ << ")" << std::endl;
        }
    
        template <class Body, class Allocator>
        void do_accept(http::request<Body, http::basic_fields<Allocator>> req) {
            ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
    
            ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
                res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " modernized-server");
            }));
    
            ws_.async_accept(req, beast::bind_front_handler(&websocket_session::on_accept, shared_from_this()));
        }
    
      private:
        void on_accept(beast::error_code ec) {
            if (ec)
                return fail(ec, "accept");
            do_read();
        }
    
        void do_read() {
            ws_.async_read(buffer_, beast::bind_front_handler(&websocket_session::on_read, shared_from_this()));
        }
    
        void on_read(beast::error_code ec, std::size_t bytes_transferred) {
            boost::ignore_unused(bytes_transferred);
    
            // This indicates that the websocket_session was closed
            if (ec == websocket::error::closed)
                return;
    
            if (ec)
                return fail(ec, "read");
    
            // Echo the message
            ws_.text(ws_.got_text());
            ws_.async_write(buffer_.data(),
                            beast::bind_front_handler(&websocket_session::on_write, shared_from_this()));
        }
    
        void on_write(beast::error_code ec, std::size_t bytes_transferred) {
            boost::ignore_unused(bytes_transferred);
    
            if (ec)
                return fail(ec, "write");
    
            // Clear the buffer
            buffer_.consume(buffer_.size());
    
            // Do another read
            do_read();
        }
    };
    
    // Handles an HTTP server connection
    class http_session : public std::enable_shared_from_this<http_session> {
        beast::tcp_stream                         stream_;
        tcp::endpoint                             ep_ = stream_.socket().remote_endpoint();
        beast::flat_buffer                        buffer_;
        std::string                               doc_root_;
        std::stop_token                           token_;
        std::stop_callback<std::function<void()>> stop_{token_, [this] { cancel(); }};
    
        static constexpr size_t             queue_limit = 8; // max responses
        std::queue<http::message_generator> response_queue_;
    
        // allow re-init in place
        std::optional<http::request_parser<http::string_body>> parser_;
    
      public:
        http_session(tcp::socket&& socket, std::string doc_root, std::stop_token token)
            : stream_(std::move(socket))
            , doc_root_(std::move(doc_root))
            , token_(std::move(token)) {
            static_assert(queue_limit > 0, "queue limit must be positive");
            std::cerr << "HTTP session (" << ep_ << ")" << std::endl;
        }
    
        ~http_session() {
            if (stream_.socket().is_open()) // unless upgraded to websocket
                std::cerr << "HTTP session destroyed (" << ep_ << ")" << std::endl;
        }
    
        void start() {
            net::dispatch(stream_.get_executor(),
                          beast::bind_front_handler(&http_session::do_read, this->shared_from_this()));
        }
    
        void cancel() {
            net::dispatch(stream_.get_executor(), [self = shared_from_this()] { self->stream_.cancel(); });
        }
    
      private:
        void do_read() {
            parser_.emplace();
            parser_->body_limit(10000);
            stream_.expires_after(std::chrono::seconds(30));
            http::async_read(stream_, buffer_, *parser_,
                             beast::bind_front_handler(&http_session::on_read, shared_from_this()));
        }
    
        void on_read(beast::error_code ec, size_t bytes_transferred) {
            boost::ignore_unused(bytes_transferred);
    
            // This means they closed the connection
            if (ec == http::error::end_of_stream)
                return do_close();
    
            if (ec)
                return fail(ec, "read");
    
            // See if it is a WebSocket Upgrade
            if (websocket::is_upgrade(parser_->get())) {
                // transfer ownership to the websocket_session
                std::make_shared<websocket_session>(stream_.release_socket(), token_)
                    ->do_accept(parser_->release());
                return;
            }
    
            // Handle an HTTP request
            queue_write(handle_request(doc_root_, parser_->release()));
    
            // If we aren't at the queue limit, try to pipeline another request
            if (response_queue_.size() < queue_limit)
                do_read();
        }
    
        void queue_write(http::message_generator response) {
            response_queue_.push(std::move(response));
    
            if (response_queue_.size() == 1)
                do_write();
        }
    
        // The single write loop
        void do_write() {
            if (!response_queue_.empty()) {
                bool keep_alive = response_queue_.front().keep_alive();
    
                beast::async_write(
                    stream_, std::move(response_queue_.front()),
                    [this, self = shared_from_this(), keep_alive](beast::error_code ec,
                                                                  size_t /*bytes_transferred*/) {
                        if (ec)
                            return fail(ec, "write");
    
                        if (!keep_alive) {
                            // "Connection: close" semantics
                            return do_close();
                        }
    
                        // Resume the read if it has been paused
                        if (response_queue_.size() == queue_limit)
                            do_read();
    
                        response_queue_.pop();
    
                        do_write();
                    }
    
                );
            }
        }
    
        void do_close() {
            beast::error_code ec;
            stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
            // At this point the connection is closed gracefully
        }
    };
    
    // Accepts incoming connections and launches the sessions
    class listener {
        net::any_io_executor ex_;
        tcp::acceptor        acceptor_;
        std::string          doc_root_;
        std::stop_source     shutdown_;
    
      public:
        listener(net::any_io_executor ex, tcp::endpoint endpoint, std::string doc_root)
            : ex_(ex)
            , acceptor_(net::make_strand(ex), endpoint)
            , doc_root_(std::move(doc_root)) {
            acceptor_.set_option(net::socket_base::reuse_address(true));
            accept_loop();
        }
    
        void shutdown() {
            shutdown_.request_stop();
            net::dispatch(acceptor_.get_executor(), [this] { acceptor_.cancel(); });
        }
    
      private:
    
        void accept_loop() {
            // The new connection gets its own strand
            acceptor_.async_accept(net::make_strand(ex_), [this](beast::error_code ec, tcp::socket socket) {
                if (!ec)
                    std::make_shared<http_session>(std::move(socket), doc_root_, shutdown_.get_token()) //
                        ->start();
                else
                    fail(ec, "accept");
    
                if (!shutdown_.stop_requested())
                    accept_loop();
            });
        }
    };
    
    int main(int argc, char** argv) {
        // Check command line arguments.
        if (argc != 5) {
            std::cerr << "Usage: modernized-server <address> <port> <doc_root> <threads>\n"
                      << "Example:\n"
                      << "    modernized-server 0.0.0.0 8080 . 1\n";
            return 1;
        }
        auto const address  = net::ip::make_address(argv[1]);
        auto const port     = static_cast<uint16_t>(std::atoi(argv[2]));
        auto const doc_root = std::string(argv[3]);
        auto const threads  = std::max<size_t>(1, std::atoi(argv[4]));
    
        net::thread_pool ioc{threads};
    
        // Create and launch a listening port
        listener lsnr(ioc.get_executor(), tcp::endpoint{address, port}, doc_root);
    
        net::signal_set signals(ioc, SIGINT, SIGTERM);
        signals.async_wait([&](beast::error_code ec, int) {
            if (!ec)
                lsnr.shutdown(); // TODO add deadline with forced .stop()
        });
    
        ioc.join();
    }
    

    With a demo:

    enter image description here

    Salient changes: