c++multithreadingboostsynchronizationasio

Implementing readers-writer lock in boost.asio with writer prioritization


My application handle 3 types of tasks that are originated by either async external source (User press button), or internally (timeout expires, ongoing task trigger another tasks, network event, etc).

Each of the tasks above is characterized by a long waiting period (i.e. sending network request or waiting for network response), in which it can cooperatively suspended and move to another task.

Also, each task (regardless the type) uses a new connection for a single send-and-receive operation.

http::response<http::dynamic_body> resp;
boost::beast::http::request<boost::beast::http::string_body> req;
boost::beast::flat_buffer buf; 

http::async_write(stream, req, yield);
http::async_read(stream, buf, resp, yield);

I wish to execute all those tasks from the same group of threads that runs the same basic io_context. Here's the relevant code :

boost::thread_group threads;
num_threads = 0; 
for(; num_threads < kMaxThreads ; num_threads++) {
   threads.create_thread([] {
      try {
        io_context_.run();
      } catch (const std::exception& e) {
        print("exception found");
      }
   });
}

Task description:

The ideal way to implement Type2 is by using post/strand without async actions (yield)

boost::asio::strand<boost::asio::io_context::executor_type> strand_

// initialization in class c'tor
strand_(io_context_.get_executor())

boost::asio::post(strand_, []() {...}

However, since my network API is using coroutines, and in order to allow type 3 tasks to run, I need to use spawn/strand, but somehow prevent type 2 tasks from entering the queue. So my first Question is how to implement it properly ?

with the current implementation, I'm guaranteed that tasks from the first type, will be executed in parallel, while the second type will be executed alone, and all pending tasks will be waiting for its termination.

Now I want to prioritize the second tasks, so once such task enter the queue, it will enter first, after all currently executing tasks are terminated. Is the any way to do it using boost.asio ?


Solution

  • It seems like your story is missing a party.

    E.g. "I've got 3 types of tasks": even rephrasing to the less anthropocentric "there are 3 types of tasks" or "the application causes 3 types of tasks", the question rising in my mind is: "who is causing the tasks".

    Another angle where this pops back up (and not for the first time) is when you describe Type3 tasks: "Type3: sending http requests without token so they can be executed in parallel to both the first 2 types" - the quiet omission is what ties these tasks to your application anyways.

    I have a feeling that the answer is more than "nothing at all" (though Type3 seems like it might be close? Who knows). Specifically, it Type2 inhibits Type1 from proceeding, surely that is on the basis of a authentication "session" (or whatever you name a token lifecycle).

    Now, everything could be simple in that there's only one instance of such a session and therefore the authentication token/connection endpoint are a singleton to your application domain. However, usually the picture is that an application handles multiple sessions.


    Regardless of whether the session is singleton or not, your domain model will require you to acknowledge it, lest you run into heaps of accidental complexity (as it seems you are already discovering).

    You could then give your session the ability to queue messages, distinguishing between those requiring authentication and those not requiring authentication. Then when the application logic determines it comes time to refresh/replace the authentication/endpoint tuple, it could

    There are quite a number of possible approaches to implementing this logic, ranging from (but not limited to) a

    Events?

    However you choose to implement it, you will want a primitive that allows you to post a continuation in response to an application-specific Event.

    This can usually be modeled using an Asio timer object. E.g. an "infinite" timer can serve as a manual-reset event (e.g. in the Type1 consumer strand cancel() it to signal the last pending Type1 request was just completed).

    The timer interfaces have a dual interface that allows you to manually cause completion of waits, or "resumption of waiters":

    Method Description
    cancel Cancel any asynchronous operations that are waiting on the timer.
    cancel_one Cancels one asynchronous operation that is waiting on the timer.

    Note that for robustness you might use something like expires_at(clock_type::timepoint::min()) instead of cancel() to make it easier to reliably detect edge-cases (see Cancelling boost asio deadline timer safely).

    Summary/Misc

    I hope this gives you some ideas to progress your design. Some loose change at the closing gate:

    Q. Now I want to prioritize the second tasks, so once such task enter the queue, it will enter first, after all currently executing tasks are terminated. Is the any way to do it using boost.asio ?

    There are some examples that show how to make prioritized handlers, with varying levels of elegance:

    I'd urge you to focus on modeling your application domain first. Then if an existing tool fits, you can employ it. Otherwise, always prefer to code your core business to your needs.


    BONUS TAKE

    It just occurred to me that a natural way to model things would be to let the scheduler freely pick up any task. However, as a necessary step in your Type1 coro, you would asynchronously request an up-to-date authentication token:

     auto token = session->get_current_auth_token(yield);
    

    Now, you can model the Type2 a bit like this:

     invalidate_current_auth_token();
     session_idle.async_wait(yield);
     
     auto new_token = async_request_new_token(yield);
     set_current_auth_token(new_token);
    

    Where session_idle could be a timer object as previously described. The set_current_auth_token could use an other timer object to signal token readiness or perhaps use an experimental::promise?

    Disclaimer: I have no experience using those.