I'm working with the Boost Beast advanced server example (available at Boost.org) and am using it as a basis for my own server. My understanding is that in this example, both the session and the listener are self-sustaining objects; they keep themselves alive by always attaching a shared_ptr of themselves to a callback handler. This design seems very elegant and practical as it appears to simplify error handling by allowing the structure to self-destruct when there are no pending asynchronous operations.
However, I'm unclear about how the entire system is shutdown. The code first calls ioc.stop() and then joins on the corresponding threads. I initially thought ioc.stop() would handle the cleanup of any pending handlers and associated shared_ptrs, but that does not seem to be the case. It seems more likely that any remaining handlers are destroyed by the final destruction of ioc itself, thereby cleaning up any remaining shared_ptrs.
In my server implementation, I've handled it by using a unique_ptr for the io_context and explicitly resetting it in the destructor of my server:
HTTPServer::HTTPServerImpl::~HTTPServerImpl()
{
m_ioc->stop();
for (auto& thread : m_iocThreads)
thread.join();
m_ioc.reset(); // reset the unique_ptr
}
I have a couple of questions regarding this shutdown process:
1.) Is the shutdown procedure in ~HTTPServerImpl, as adopted from the example, considered "clean," in terms of properly closing all sockets? How exactly does this happen? Through the destructors of beast::tcp_stream and tcp::acceptor when the shared_ptrs destruct? Or should I explicitly call cancel()/close() before stop() through some weak_ptr (Is this cancel/close allowed from the main thread? Is it thread-safe?)? Are all asynchronous operations aborted during m_ioc.reset(), or are some handlers still executed (e.g. timers)?
2.) What exactly does stop() do in this context? Does it simply prevent new handlers from being added, or is there more to it than that? The docu says "abandoning unfinished operations", what does that mean exactly? I can't see that this will stop anything. No timers are aborted prematurely either.
It seems more likely that any remaining handlers are destroyed by the final destruction of ioc itself, thereby cleaning up any remaining shared_ptrs.
Indeed. stop
merely "stops the world" but doesn't "get rid of history".
The destructor does.
Q1. Is the shutdown procedure in ~HTTPServerImpl, as adopted from the example, considered "clean," in terms of properly closing all sockets?
Yes. It closes all sockets implicitly because they're only kept alive by the reference count in any pending async operation (shared_from_this
).
Q2. What exactly does stop() do in this context? Does it simply prevent new handlers from being added, or is there more to it than that?
It stops the scheduler. Frankly, that's an implementation detail. It functionally prevents any handler from being run and breaks any running event loops (e.g. io_context.run()
).
You ask a lot of questions along the way, but I think it all boils down to these:
Or should I explicitly call cancel()/close() before stop()
I would, yes. Keep in mind you take into account some kind of timeout to avoid situations where the shutdown deadlocks itself. This would be "defense against byzantine failure" (https://en.wikipedia.org/wiki/Byzantine_fault)
through some weak_ptr
That's how I usually do it. In fact you can probably search my answers using weak_ptr
, cancel
, session
as keywords to find examples.
Is this cancel/close allowed from the main thread?
Not as long as other threads may access the IO objects.
Is it thread-safe?
Cancel has no exceptional safety guarantees, so the usual applies (e.g. https://www.boost.org/doc/libs/1_85_0/doc/html/boost_asio/reference/ip__tcp/socket.html#boost_asio.reference.ip__tcp.socket.thread_safety)
Here's how I'd probably write the same:
#define BOOST_ASIO_NO_DEPRECATED
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
#include <map>
#include <optional>
#include <queue>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using beast::string_view;
using boost::asio::ip::tcp;
namespace mime { // see https://stackoverflow.com/a/48235134/85371
struct CiCmp {
template <typename R1, typename R2> bool operator()(R1 const& a, R2 const& b) const {
return boost::algorithm::ilexicographical_compare(a, b);
}
};
static std::map<string_view, string_view, CiCmp> const s_ext_map{
{".txt", "text/plain"},
{".htm", "text/html"},
{".html", "text/html"},
{".php", "text/html"},
{".css", "text/css"},
{".js", "application/javascript"},
{".json", "application/json"},
{".xml", "application/xml"},
{".swf", "application/x-shockwave-flash"},
{".flv", "video/x-flv"},
{".png", "image/png"},
{".jpe", "image/jpeg"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".gif", "image/gif"},
{".bmp", "image/bmp"},
{".ico", "image/vnd.microsoft.icon"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".svg", "image/svg+xml"},
{".svgz", "image/svg+xml"},
};
} // namespace mime
static string_view mime_type(string_view n_path) {
auto const ext = [](string_view n_path) {
auto pos = n_path.rfind(".");
return pos != string_view::npos ? n_path.substr(pos) : string_view{};
};
if (auto i = mime::s_ext_map.find(ext(n_path)); i != mime::s_ext_map.cend()) {
return i->second;
} else {
return "application/text";
}
}
// return path normalized for the platform
std::string path_cat(string_view base, string_view path) {
if (base.empty())
return std::string(path);
std::string result(base);
#if defined(_WIN32) || defined(__CYGWIN__)
char constexpr path_separator = '\\';
if (result.back() == path_separator)
result.resize(result.size() - 1);
result.append(path.data(), path.size());
for (auto& c : result)
if (c == '/')
c = path_separator;
#else
char constexpr path_separator = '/';
if (result.back() == path_separator)
result.resize(result.size() - 1);
result.append(path.data(), path.size());
#endif
return result;
}
// Return a response for the given request.
template <class Body, class Allocator>
http::message_generator //
handle_request(string_view doc_root, http::request<Body, http::basic_fields<Allocator>>&& req) {
auto const error = [&req](http::status code, std::string body) {
http::response<http::string_body> res{code, req.version(), std::move(body)};
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "text/html");
res.keep_alive(req.keep_alive());
res.prepare_payload();
return res;
};
if (req.method() != http::verb::get && req.method() != http::verb::head)
return error(http::status::bad_request, "Unknown HTTP-method");
// Request path must be absolute and not contain "..".
if (req.target().empty() || req.target()[0] != '/' || req.target().find("..") != string_view::npos)
return error(http::status::bad_request, "Illegal request-target");
std::string path = path_cat(doc_root, req.target());
if (req.target().back() == '/')
path.append("index.html");
// Attempt to open the file
beast::error_code ec;
http::file_body::value_type body;
body.open(path.c_str(), beast::file_mode::scan, ec);
if (ec == beast::errc::no_such_file_or_directory)
return error(http::status::not_found,
"The resource '" + std::string(req.target()) + "' was not found.");
else if (ec)
return error(http::status::internal_server_error, ec.message());
if (req.method() == http::verb::head) {
// Respond to HEAD request
http::response<http::empty_body> res{http::status::ok, req.version()};
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, mime_type(path));
res.content_length(body.size());
res.keep_alive(req.keep_alive());
return res;
} else {
// Respond to GET request
auto const size = body.size(); // we need it after the move
http::response<http::file_body> res{std::piecewise_construct, std::make_tuple(std::move(body)),
std::make_tuple(http::status::ok, req.version())};
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, mime_type(path));
res.content_length(size);
res.keep_alive(req.keep_alive());
return res;
}
}
void fail(beast::error_code ec, char const* what) { std::cerr << what << ": " << ec.message() << "\n"; }
// Echo session
class websocket_session : public std::enable_shared_from_this<websocket_session> {
websocket::stream<beast::tcp_stream> ws_;
tcp::endpoint ep_ = beast::get_lowest_layer(ws_).socket().remote_endpoint();
beast::flat_buffer buffer_;
std::stop_token token_;
std::stop_callback<std::function<void()>> stop_ //
{token_, [this] {
net::post(ws_.get_executor(), [this] {
ws_.next_layer().cancel(); // or do_close?
});
;
}};
public:
explicit websocket_session(tcp::socket&& socket, std::stop_token token)
: ws_(std::move(socket))
, token_(std::move(token)) {
std::cerr << "Websocket upgrade (" << ep_ << ")" << std::endl;
}
~websocket_session() {
std::cerr << "Websocket session destroyed (" << ep_ << ")" << std::endl;
}
template <class Body, class Allocator>
void do_accept(http::request<Body, http::basic_fields<Allocator>> req) {
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " modernized-server");
}));
ws_.async_accept(req, beast::bind_front_handler(&websocket_session::on_accept, shared_from_this()));
}
private:
void on_accept(beast::error_code ec) {
if (ec)
return fail(ec, "accept");
do_read();
}
void do_read() {
ws_.async_read(buffer_, beast::bind_front_handler(&websocket_session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
// This indicates that the websocket_session was closed
if (ec == websocket::error::closed)
return;
if (ec)
return fail(ec, "read");
// Echo the message
ws_.text(ws_.got_text());
ws_.async_write(buffer_.data(),
beast::bind_front_handler(&websocket_session::on_write, shared_from_this()));
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
// Clear the buffer
buffer_.consume(buffer_.size());
// Do another read
do_read();
}
};
// Handles an HTTP server connection
class http_session : public std::enable_shared_from_this<http_session> {
beast::tcp_stream stream_;
tcp::endpoint ep_ = stream_.socket().remote_endpoint();
beast::flat_buffer buffer_;
std::string doc_root_;
std::stop_token token_;
std::stop_callback<std::function<void()>> stop_{token_, [this] { cancel(); }};
static constexpr size_t queue_limit = 8; // max responses
std::queue<http::message_generator> response_queue_;
// allow re-init in place
std::optional<http::request_parser<http::string_body>> parser_;
public:
http_session(tcp::socket&& socket, std::string doc_root, std::stop_token token)
: stream_(std::move(socket))
, doc_root_(std::move(doc_root))
, token_(std::move(token)) {
static_assert(queue_limit > 0, "queue limit must be positive");
std::cerr << "HTTP session (" << ep_ << ")" << std::endl;
}
~http_session() {
if (stream_.socket().is_open()) // unless upgraded to websocket
std::cerr << "HTTP session destroyed (" << ep_ << ")" << std::endl;
}
void start() {
net::dispatch(stream_.get_executor(),
beast::bind_front_handler(&http_session::do_read, this->shared_from_this()));
}
void cancel() {
net::dispatch(stream_.get_executor(), [self = shared_from_this()] { self->stream_.cancel(); });
}
private:
void do_read() {
parser_.emplace();
parser_->body_limit(10000);
stream_.expires_after(std::chrono::seconds(30));
http::async_read(stream_, buffer_, *parser_,
beast::bind_front_handler(&http_session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if (ec == http::error::end_of_stream)
return do_close();
if (ec)
return fail(ec, "read");
// See if it is a WebSocket Upgrade
if (websocket::is_upgrade(parser_->get())) {
// transfer ownership to the websocket_session
std::make_shared<websocket_session>(stream_.release_socket(), token_)
->do_accept(parser_->release());
return;
}
// Handle an HTTP request
queue_write(handle_request(doc_root_, parser_->release()));
// If we aren't at the queue limit, try to pipeline another request
if (response_queue_.size() < queue_limit)
do_read();
}
void queue_write(http::message_generator response) {
response_queue_.push(std::move(response));
if (response_queue_.size() == 1)
do_write();
}
// The single write loop
void do_write() {
if (!response_queue_.empty()) {
bool keep_alive = response_queue_.front().keep_alive();
beast::async_write(
stream_, std::move(response_queue_.front()),
[this, self = shared_from_this(), keep_alive](beast::error_code ec,
size_t /*bytes_transferred*/) {
if (ec)
return fail(ec, "write");
if (!keep_alive) {
// "Connection: close" semantics
return do_close();
}
// Resume the read if it has been paused
if (response_queue_.size() == queue_limit)
do_read();
response_queue_.pop();
do_write();
}
);
}
}
void do_close() {
beast::error_code ec;
stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
// At this point the connection is closed gracefully
}
};
// Accepts incoming connections and launches the sessions
class listener {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::string doc_root_;
std::stop_source shutdown_;
public:
listener(net::any_io_executor ex, tcp::endpoint endpoint, std::string doc_root)
: ex_(ex)
, acceptor_(net::make_strand(ex), endpoint)
, doc_root_(std::move(doc_root)) {
acceptor_.set_option(net::socket_base::reuse_address(true));
accept_loop();
}
void shutdown() {
shutdown_.request_stop();
net::dispatch(acceptor_.get_executor(), [this] { acceptor_.cancel(); });
}
private:
void accept_loop() {
// The new connection gets its own strand
acceptor_.async_accept(net::make_strand(ex_), [this](beast::error_code ec, tcp::socket socket) {
if (!ec)
std::make_shared<http_session>(std::move(socket), doc_root_, shutdown_.get_token()) //
->start();
else
fail(ec, "accept");
if (!shutdown_.stop_requested())
accept_loop();
});
}
};
int main(int argc, char** argv) {
// Check command line arguments.
if (argc != 5) {
std::cerr << "Usage: modernized-server <address> <port> <doc_root> <threads>\n"
<< "Example:\n"
<< " modernized-server 0.0.0.0 8080 . 1\n";
return 1;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<uint16_t>(std::atoi(argv[2]));
auto const doc_root = std::string(argv[3]);
auto const threads = std::max<size_t>(1, std::atoi(argv[4]));
net::thread_pool ioc{threads};
// Create and launch a listening port
listener lsnr(ioc.get_executor(), tcp::endpoint{address, port}, doc_root);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](beast::error_code ec, int) {
if (!ec)
lsnr.shutdown(); // TODO add deadline with forced .stop()
});
ioc.join();
}
With a demo:
Salient changes:
do_close
does shutdown_both
instead of shutdown_send
; alternative you might cancel()
to stop a pending read. Otherwise do_close()
will not actually cause the connection to be closedstd::stop_token
with callbacks