boostchannelasio

Correct usage of concurrent_channel in Asio?


I am currently learning how to use the concurrent_channel provided in boost asio. Unfortunately, there are no usage examples for concurrent_channel in boost asio, so I'm trying to write some demos myself for verification.

I have a couple of questions regarding concurrent_channel: 1. Does my test program have any undefined behavior? 2. What is the correct usage of concurrent_channel in asio?

#include <exception>
#include <string>
#include <thread>
#include <vector>

#include <boost/asio/error.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>

namespace asio = boost::asio;
namespace sys  = boost::system;

// asio's concurrent channel
using concurrent_channel_t =
    asio::experimental::concurrent_channel<void(sys::error_code, int)>;

// An exception handler for coroutines that rethrows any exception thrown by
// the coroutine. We handle all known error cases with error_code's. If an
// exception is raised, it's something critical, e.g. out of memory.
// Re-raising it in the exception handler will case io_context::run()
// to throw and terminate the program.
static constexpr auto rethrow_handler = [](std::exception_ptr ex) {
    if (ex) {
        std::rethrow_exception(ex);
    }
};

void send_func(int thrid, concurrent_channel_t& chan)
{
    std::printf("thread %d: start to send data\n", thrid);

    for (int i = 0; i < 100; ++i) {
        if (!chan.try_send(sys::error_code{}, i)) {
            chan.async_send(sys::error_code{}, 0, [thrid](sys::error_code ec) {
                std::printf("thread %d: send data error: %s\n",
                            thrid,
                            ec.message().c_str());
            });
        }
    }

    std::printf("thread %d: send data done\n", thrid);
}

void receive_func(concurrent_channel_t& chan, asio::yield_context yield)
{
    std::printf("start to receive data from channel\n");
    while (true) {
        sys::error_code ec;
        auto n = chan.async_receive(yield[ec]);
        if (ec) {
            if (ec != asio::experimental::error::channel_closed) {
                std::printf("receiving data error: %s\n", ec.message().c_str());
            }
            return;
        }
        std::printf("received data %d\n", n);
    }
}

int main(int argc, char* argv[])
{
    try {
        int thread_count;
        if (argc < 2) {
            thread_count = 4;
        } else {
            thread_count = std::stoi(argv[1]);
        }

        asio::io_context ioctx;
        concurrent_channel_t chan(ioctx.get_executor(), 4096);

        // Launch a stackfull coroutine for receiving data from channel.
        asio::spawn(
            ioctx.get_executor(),
            [&chan](asio::yield_context yield) mutable {
                receive_func(chan, yield);
            },
            rethrow_handler);

        // Start N threads for sending data to channel concurrently.
        std::vector<std::thread> threads;
        for (int i = 0; i < thread_count; ++i) {
            threads.emplace_back(
                [thrid = i, &chan]() { send_func(thrid, chan); });
        }

        // Start a new background thread for joining sending threads.
        std::thread([&threads, &chan]() {
            for (auto& th: threads) {
                th.join();
            }

            chan.close();
        }).detach();

        // Run io_context endless.
        ioctx.run();
        return 0;
    } catch (const std::exception& ex) {
        std::printf("unexpected exception: %s\n", ex.what());
        return 1;
    }
}


Solution

    1. No I don't see UB in your code.

    2. What is "correct"? If you mean "typical", then I'd say it's more typical that channel writes also arise from asynchronous operations.

    In that sense it's more typical to see a thread pool for any async work which then causes messages written to the channel.

    In general, try to move away from focusing on (or thinking about) threads. Instead, focus on (asynchronous) process flows, that run on top of your IO service threads.

    Here's my clarified version:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/experimental/concurrent_channel.hpp>
    #include <boost/asio/spawn.hpp>
    using namespace std::chrono_literals;
    
    namespace asio = boost::asio;
    using boost::system::error_code;
    
    // asio's concurrent channel
    using Channel = asio::experimental::concurrent_channel<void(error_code, int)>;
    
    static void rethrow_handler(std::exception_ptr ex) {
        if (ex) std::rethrow_exception(ex);
    };
    
    static std::atomic_int tid_gen = 0;
    thread_local int       tid     = tid_gen++;
    
    void producer(Channel& chan, asio::yield_context yield) {
        std::printf("producer in thread %d: start\n", tid);
    
        for (int i = 0; i < 100; ++i) {
            error_code ec;
            chan.async_send({}, i, yield[ec]);
            if (ec) {
                std::printf("thread %d: send data error: %s\n", tid, ec.message().c_str());
                break;
            }
        }
    
        std::printf("producer in thread %d: exit\n", tid);
    }
    
    void consumer(Channel& chan, asio::yield_context yield) {
        std::printf("consumer in thread %d: start\n", tid);
    
        for (error_code ec; !ec;) {
            int n = chan.async_receive(yield[ec]);
            if (!ec)
                std::printf("consumer in thread %d: received %d\n", tid, n);
            else
                // if (ec != asio::experimental::error::channel_closed)
                std::printf("receiving data error: %s\n", ec.message().c_str());
        }
        std::printf("consumer in thread %d: exit\n", tid);
    }
    
    int main() try {
        constexpr int     producer_count = 4;
        asio::thread_pool ioc;
    
        Channel chan(ioc.get_executor(), 4096);
    
        spawn(ioc, [&chan](asio::yield_context yield) { consumer(chan, yield); }, rethrow_handler);
    
        // Start N producers for sending data to channel concurrently
        for (int i = 0; i < producer_count; ++i)
            spawn(ioc, [&chan](asio::yield_context yield) { producer(chan, yield); }, rethrow_handler);
    
        std::this_thread::sleep_for(1s);
        chan.close();
    
        ioc.join();
    } catch (std::exception const& ex) {
        std::printf("unexpected exception: %s\n", ex.what());
        return 1;
    }
    

    Prints something like:

    consumer in thread 0: start
    producer in thread 1: start
    producer in thread 3: start
    producer in thread 2: start
    producer in thread 4: start
    consumer in thread 5: received 0
    consumer in thread 6: received 0
    consumer in thread 0: received 0
    consumer in thread 7: received 0
    consumer in thread 8: received 1
    consumer in thread 1: received 1
    consumer in thread 9: received 1
    consumer in thread 9: received 1
    consumer in thread 9: received 2
    consumer in thread 0: received 2
    consumer in thread 10: received 2
    consumer in thread 3: received 2
    consumer in thread 11: received 3
    consumer in thread 6: received 3
    consumer in thread 4: received 3
    consumer in thread 9: received 3
    consumer in thread 0: received 4
    consumer in thread 10: received 4
    consumer in thread 3: received 4
    consumer in thread 12: received 4
    consumer in thread 6: received 5
    ...
    consumer in thread 8: received 94
    consumer in thread 3: received 98
    consumer in thread 3: received 95
    consumer in thread 3: received 99
    consumer in thread 3: received 96
    consumer in thread 5: received 97
    consumer in thread 5: received 98
    consumer in thread 4: received 99
    receiving data error: Channel closed
    consumer in thread 12: exit