c++asynchronousboostboost-asioshared-ptr

boost asio datetime server example and using std::async is not working


I am trying to learn boost asio ( boost 1.84, C++20, Ubuntu 23.04 ) and have adapted the following daytime server example a little bit from: https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html

I let make_daytime_string() sleep for 5 seconds to simulate heavy work, which I then outsource via std::async. Then I post a write_async at the end of std::async, so that the write_async is executed later in the same thread(s) as the other completion handlers.

I have 2 questions about my procedure:

1.) Is a post even necessary or is it allowed to call async_write directly out of std::async? Any Pro/cons here?

2.) There is a problem with my variant, namely that the tcp_connection dtor is no longer called and the client no longer receives a boost::asio::error::eof. The data is transferred correctly with 25 bytes to the client. Only when I terminate the server, the client gets closed. I suspect that I am doing something wrong with the shared_ptr?

Here is the adapted code:

    #include <ctime>
    #include <functional>
    #include <iostream>
    #include <memory>
    #include <string>
    #include <boost/asio.hpp>

    using boost::asio::ip::tcp;

    std::future< void > futureSink;

    std::string make_daytime_string()
    {
        using namespace std;
        time_t now = time( 0 );
        std::this_thread::sleep_for( std::chrono::milliseconds( 5000 ) );
        return ctime( &now );
    }

    class tcp_connection : public std::enable_shared_from_this< tcp_connection >
    {
    public:
        typedef std::shared_ptr< tcp_connection > pointer;

        static pointer create(boost::asio::io_context& io_context)
        {
            return pointer( new tcp_connection( io_context ) );
        }

        tcp::socket& socket()
        {
            return socket_;
        }

        void start()
        {
            auto self = shared_from_this();

            futureSink = std::async( std::launch::async,
                [ self, this ]()
                {
                    message_ = make_daytime_string();

                    boost::asio::post( socket_.get_executor(),
                        [ self ]()
                        {
                            boost::asio::async_write( self->socket_,
                                                      boost::asio::buffer( self->message_ ),
                                                      std::bind( &tcp_connection::handle_write, self,
                                                                 boost::asio::placeholders::error,
                                                                 boost::asio::placeholders::bytes_transferred ) );
                        });
                });
        }
    private:
        tcp_connection( boost::asio::io_context& io_context ) : socket_( io_context )
        {}

        void handle_write(const boost::system::error_code& /*error*/, size_t /*bytes_transferred*/)
        {}

        tcp::socket socket_;
        std::string message_;
    };

    class tcp_server
    {
    public:
        tcp_server( boost::asio::io_context& io_context ) : io_context_( io_context ),
            acceptor_( io_context, tcp::endpoint( tcp::v4(), 13 ) )
        {
            start_accept();
        }

    private:
        void start_accept()
        {
            tcp_connection::pointer new_connection = tcp_connection::create( io_context_ );

            acceptor_.async_accept( new_connection->socket(),
                                    std::bind( &tcp_server::handle_accept, this, new_connection,
                                               boost::asio::placeholders::error ) );
        }

        void handle_accept( tcp_connection::pointer new_connection, const boost::system::error_code& error )
        {
            if ( !error )
                new_connection->start();

            start_accept();
        }

        boost::asio::io_context& io_context_;
        tcp::acceptor acceptor_;
    };

    int main()
    {
        try
        {
            boost::asio::io_context io_context;
            tcp_server server( io_context );
            io_context.run();
        }
        catch ( std::exception& e )
        {
            std::cerr << e.what() << std::endl;
        }

        return 0;
    }

Edit 07.04.2023 16:08

I found an answer to question 2. Here the captured shared_ptr is not cleaned up, because I have moved the std::future to a global variable (fire & forget). The remedy is to move the shared_ptr as follows:

void start()
{
    auto self = shared_from_this();

    futureSink = std::async( std::launch::async,
                            [ self, this ]() mutable
                            {
                                message_ = make_daytime_string();

                                boost::asio::post( socket_.get_executor(),
                                                  [ self = std::move( self ) ]()
                                                  {
                                                      boost::asio::async_write( self->socket_,
                                                                               boost::asio::buffer( self->message_ ),
                                                                               std::bind( &tcp_connection::handle_write, self,
                                                                                         boost::asio::placeholders::error,
                                                                                         boost::asio::placeholders::bytes_transferred ) );
                                                  });
                            });
} 

Solution

    1. Yes the post is necessary. Wherever std::async runs the lambda it is by definition not on the service thread (which is an implicit strand).

    2. Indeed you are. If you consume futureSink you will actually see the destructor run again: http://coliru.stacked-crooked.com/a/47d92eea20083e0f


    Where to fix?

    First of all, std::async is not async. It's strictly a less reliable std::thread (at best).

    Your future is a global, which cannot work well with concurrent connections.

    Since really you just want to pool work on a non-service thread (so you don't block IO), I'd really just /do that/.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <functional>
    #include <iostream>
    #include <memory>
    #include <string>
    
    namespace asio = boost::asio;
    using namespace std::chrono_literals;
    using namespace std::placeholders;
    using asio::ip::tcp;
    
    static asio::thread_pool pool(4); // how about some control over our threads
    
    static std::string simulate_work() {
        std::this_thread::sleep_for(5s);
        return "Work simulation done";
    }
    
    class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
      public:
        using pointer = std::shared_ptr<tcp_connection>;
    
        static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }
    
        tcp::socket& socket() { return socket_; }
    
        void start() {
            std::cerr << __PRETTY_FUNCTION__ << std::endl;
            auto self = shared_from_this();
    
            post(pool, [self, this]() {
                auto m = simulate_work();
                post(socket_.get_executor(),
                     [self, m = std::move(m)]() mutable { self->on_work_completed(std::move(m)); });
            });
        }
    
        ~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }
    
      private:
        tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}
    
        void on_work_completed(std::string message) { // already on executor
            message_ = std::move(message);
            async_write(socket_, asio::buffer(message_),
                        bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
        }
    
        void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
            std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
        }
    
        tcp::socket socket_;
        std::string message_;
    };
    
    class tcp_server {
      public:
        tcp_server(asio::io_context& io_context)
            : io_context_(io_context)
            , acceptor_(io_context, {tcp::v4(), 1313}) {
            start_accept();
        }
    
      private:
        void start_accept() {
            tcp_connection::pointer new_connection = tcp_connection::create(io_context_);
    
            acceptor_.async_accept(new_connection->socket(),
                                   bind(&tcp_server::handle_accept, this, new_connection, _1));
        }
    
        void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
            if (!error)
                new_connection->start();
    
            start_accept();
        }
    
        asio::io_context& io_context_;
        tcp::acceptor     acceptor_;
    };
    
    int main() {
        try {
            boost::asio::io_context io_context;
    
            tcp_server server(io_context);
            io_context.run();
            pool.join();
        } catch (std::exception const& e) {
            std::cerr << e.what() << std::endl;
        }
    }
    

    enter image description here

    BONUS/FANCY

    If you want to be a little bit fancy, you can supply an Asio-style initiation function async_simulate_work:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <functional>
    #include <iostream>
    #include <memory>
    #include <string>
    
    namespace asio = boost::asio;
    using namespace std::chrono_literals;
    using namespace std::placeholders;
    using asio::ip::tcp;
    
    static std::string simulate_work() {
        std::this_thread::sleep_for(5s);
        return "Work simulation done";
    }
    
    template <typename Token> auto async_simulate_work(Token&& token) {
        return asio::async_initiate<Token, void(std::string)>( //
            [](auto handler) {
                std::thread(                             //
                    [h = std::move(handler)]() mutable { //
                        std::move(h)(simulate_work());
                    })
                    .detach();
            },
            token);
    }
    
    class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
      public:
        using pointer = std::shared_ptr<tcp_connection>;
    
        static pointer create(asio::io_context& io_context) { return pointer(new tcp_connection(io_context)); }
    
        tcp::socket& socket() { return socket_; }
    
        void start() {
            std::cerr << __PRETTY_FUNCTION__ << std::endl;
            auto self = shared_from_this();
    
            async_simulate_work(bind_executor(socket_.get_executor(), [self, this](std::string message) {
                message_ = std::move(message);
                async_write(socket_, asio::buffer(message_),
                            bind(&tcp_connection::handle_write, shared_from_this(), _1, _2));
            }));
        }
    
        ~tcp_connection() { std::cerr << __PRETTY_FUNCTION__ << std::endl; }
    
      private:
        tcp_connection(boost::asio::io_context& io_context) : socket_(io_context) {}
    
        void handle_write(boost::system::error_code const& ec, size_t /*bytes_transferred*/) {
            assert(socket_.get_executor().target<asio::io_context::executor_type>()->running_in_this_thread());
            std::cerr << ec.message() << ": " << __PRETTY_FUNCTION__ << std::endl;
        }
    
        tcp::socket socket_;
        std::string message_;
    };
    
    class tcp_server {
      public:
        tcp_server(asio::io_context& io_context)
            : io_context_(io_context)
            , acceptor_(io_context, {tcp::v4(), 1313}) {
            start_accept();
        }
    
      private:
        void start_accept() {
            tcp_connection::pointer new_connection = tcp_connection::create(io_context_);
    
            acceptor_.async_accept(new_connection->socket(),
                                   bind(&tcp_server::handle_accept, this, new_connection, _1));
        }
    
        void handle_accept(tcp_connection::pointer new_connection, boost::system::error_code const& error) {
            if (!error)
                new_connection->start();
    
            start_accept();
        }
    
        asio::io_context& io_context_;
        tcp::acceptor     acceptor_;
    };
    
    int main() {
        try {
            boost::asio::io_context io_context;
    
            tcp_server server(io_context);
            io_context.run();
        } catch (std::exception const& e) {
            std::cerr << e.what() << std::endl;
        }
    }
    

    With another local demo:

    enter image description here