boostboost-asioboost-beastboost-beast-websocket

Understand the usage of strand without locking


Reference: websocket_client_async_ssl.cpp strands

Question 1> Here is my understanding:

Given a few async operations bound with the same strand, the strand will guarantee that all associated async operations will be executed as a strictly sequential invocation.

Does this mean that all above async operations will be executed by a same thread? Or it just says that at any time, only one asyn operation will be executed by any available thread?

Question 2> The boost::asio::make_strand function creates a strand object for an executor or execution context.

session(net::io_context& ioc, ssl::context& ctx)
    : resolver_(net::make_strand(ioc))
    , ws_(net::make_strand(ioc), ctx)
    

Here, resolver_ and ws_ have its own strand, but I have problems to understand how each strand applies to what asyn operations.

For example, in the following aysnc and handler, which functions(i.e aysnc or handler) are bound to the same strand and will not run simultaneously.

run
  =>resolver_.async_resolve -->session::on_resolve 
    =>beast::get_lowest_layer(ws_).async_connect -->session::on_connect
      =>ws_.next_layer().async_handshake --> session::on_ssl_handshake
        =>ws_.async_handshake --> session::on_handshake

async ================================= handler

Question 3> How can we retrieve the strand from executor? Is there any difference between these two?

get_associated_executor get_executor

io_context::get_executor: Obtains the executor associated with the io_context.

get_associated_executor: Helper function to obtain an object's associated executor.

Question 4> Is it correct that I use the following method to bind deadline_timer to io_context to prevent race condition?

All other parts of the code is same as the example of websocket_client_async_ssl.cpp.

session(net::io_context& ioc, ssl::context& ctx)
    : resolver_(net::make_strand(ioc))
    , ws_(net::make_strand(ioc), ctx),
    d_timer_(ws_.get_executor())
{     }

void on_heartbeat_write( beast::error_code ec, std::size_t bytes_transferred)
{
  d_timer_.expires_from_now(boost::posix_time::seconds(5));
  d_timer_.async_wait(beast::bind_front_handler( &session::on_heartbeat, shared_from_this()));
}

void on_heartbeat(const boost::system::error_code& ec)
{
    ws_.async_write( net::buffer(text_ping_), beast::bind_front_handler( &session::on_heartbeat_write, shared_from_this()));
}

void on_handshake(beast::error_code ec)
{
    d_timer_.expires_from_now(boost::posix_time::seconds(5));
    d_timer_.async_wait(beast::bind_front_handler( &session::on_heartbeat, shared_from_this()));
    ws_.async_write(net::buffer(text_), beast::bind_front_handler(&session::on_write, shared_from_this()));
}

Note: I used d_timer_(ws_.get_executor()) to init deadline_timer and hoped that it will make sure they don't write or read the websocket at the same time. Is this the right way to do it?


Solution

  • Question 1

    Does this mean that all above async operations will be executed by a same thread? Or it just says that at any time, only one async operation will be executed by any available thread?

    The latter.

    Question 2

    Here, resolver_ and ws_ have its own strand,

    Let me interject that I think that's unnecessarily confusing in the example. They could (should, conceptually) have used the same strand, but I guess they didn't want to go through the trouble of storing a strand. I'd probably have written:

    explicit session(net::io_context& ioc, ssl::context& ctx)
        : resolver_(net::make_strand(ioc))
        , ws_(resolver_.get_executor(), ctx) {}
    

    The initiation functions are called where you decide. The completion handlers are dispatch-ed on the executor that belongs to the IO object that you call the operation on, unless the completion handler is bound to a different executor (e.g. using bind_executor, see get_associated_exectutor). In by far the most circumstances in modern Asio, you will not bind handlers, instead "binding IO objects" to the proper executors. This makes it less typing, and much harder to forget.

    So in effect, all the async-initiations in the chain except for the one in run() are all on a strand, because the IO objects are tied to strand executors.

    You have to keep in mind to dispatch on a strand when some outside user calls into your classes (e.g. often to stop). It is a good idea therefore to develop a convention. I'd personally make all the "unsafe" methods and members private:, so I will often have pairs like:

      public:
        void stop() {
            dispatch(strand_, [self=shared_from_this()] { self->do_stop(); });
        }
    
      private:
        void do_stop() {
            beast::get_lowest_layer(ws_).cancel();
        }
    

    Side Note:

    In this particular example, there is only one (main) thread running/polling the io service, so the whole point is moot. But as I explained recently (Does mulithreaded http processing with boost asio require strands?), the examples are here to show some common patterns that allow one to do "real life" work as well

    Bonus: Handler Tracking

    Let's use BOOST_ASIO_ENABLE_HANDLER_TRACKING to get some insight.¹ Running a sample session shows something like

    enter image description here

    If you squint a little, you can see that all the strand executors are the same:

    0*1|resolver@0x559785a03b68.async_resolve
    1*2|strand_executor@0x559785a02c50.execute
    2*3|socket@0x559785a05770.async_connect
    3*4|strand_executor@0x559785a02c50.execute
    4*5|socket@0x559785a05770.async_send
    5*6|strand_executor@0x559785a02c50.execute
    6*7|socket@0x559785a05770.async_receive
    7*8|strand_executor@0x559785a02c50.execute
    8*9|socket@0x559785a05770.async_send
    9*10|strand_executor@0x559785a02c50.execute
    10*11|socket@0x559785a05770.async_receive
    11*12|strand_executor@0x559785a02c50.execute
    12*13|deadline_timer@0x559785a05958.async_wait
    12*14|socket@0x559785a05770.async_send
    14*15|strand_executor@0x559785a02c50.execute
    15*16|socket@0x559785a05770.async_receive
    16*17|strand_executor@0x559785a02c50.execute
    17*18|socket@0x559785a05770.async_send
    13*19|strand_executor@0x559785a02c50.execute
    18*20|strand_executor@0x559785a02c50.execute
    20*21|socket@0x559785a05770.async_receive
    21*22|strand_executor@0x559785a02c50.execute
    22*23|deadline_timer@0x559785a05958.async_wait
    22*24|socket@0x559785a05770.async_send
    24*25|strand_executor@0x559785a02c50.execute
    25*26|socket@0x559785a05770.async_receive
    26*27|strand_executor@0x559785a02c50.execute
    23*28|strand_executor@0x559785a02c50.execute
    

    Question 3

    How can we retrieve the strand from executor?

    You don't[*]. However make_strand(s) returns an equivalent strand if s is already a strand.

    [*] By default, Asio's IO objects use the type-erased executor (asio::executor or asio::any_io_executor depending on version). So technically you could ask it about its target_type() and, after comparing the type id to some expected types use something like target<net::strand<net::io_context::executor_type>>() to access the original, but there's really no use. You don't want to be inspecting the implementation details. Just honour the handlers (by dispatching them on their associated executors like Asio does).

    Is there any difference between these two? get_associated_executor get_executor

    get_executor gets an owned executor from an IO object. It is a member function.

    asio::get_associated_executor gets associated executors from handler objects. You will observe that get_associated_executor(ws_) doesn't compile (although some IO objects may satisfy the criteria to allow it to work).

    Question 4

    Is it correct that I use the following method to bind deadline_timer to io_context

    You will notice that you did the same as I already mentioned above to tie the timer IO object to the same strand executor. So, kudos.

    to prevent race condition?

    You don't prevent race conditions here. You prevent data races. That is because in on_heartbeat you access the ws_ object which is an instance of a class that is NOT threadsafe. In effect, you're sharing access to non-threadsafe resources, and you need to serialize access, hence you want to be on the strand that all other accesses are also on.

    Note: [...] and hoped that it will make sure they don't write or read the websocket at the same time. Is this the right way to do it?

    Yes this is a good start, but it is not enough.

    Firstly, you can write or read at the same time, as long as

    In particular, your on_heartbeat might be safely serialized so you don't have a data race on calling the async_write initiation function. However, you need more checks to know whether a write operation is already (still) in progress. One way to achieve that is to have a queue with outgoing messages. If you have strict heartbeat requirements and high load, you might need a priority-queue here.


    ¹ I simplified the example by replacing the stream type with the Asio native ssl::stream<tcp::socket>. This means we don't get all the internal timers that deal with tcp_stream expirations. See https://pastebin.ubuntu.com/p/sPRYh6Xbwz/