c++websocketboostboost-asioboost-beast

Designing TCP to WebSocket


I'm planning to have TCP to WebSocket tunnel. Multiple TCP connections to one WebSocket connection(I know this require multiplexing but one thing at a time). This means having a TCP socket listening locally and forward everything that's received from TCP socket through WebSocket.

I have been learning Boost Asio and Boost Beast recently and wrote the following code so far. Its still very far from being complete but as I'm pretty new to coroutines and Boost I have some design issues,

#include "server_certificate.hpp"  // Got this from Boost Beast
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/spawn.hpp>

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/json/src.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <memory>
#include <time.h>

#pragma comment(lib, "crypt32.lib")
#pragma comment(lib, "libssl.lib")
#pragma comment(lib, "libcrypto.lib")

using namespace std;
namespace json = boost::json;
namespace asio = boost::asio;
namespace this_coro = asio::this_coro;
using asio::ip::tcp;
using boost::system::error_code;
using namespace asio::experimental::awaitable_operators;

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

template <typename T> using Defer = asio::deferred_t::as_default_on_t<T>;
using Socket = Defer<tcp::socket>;
using Acceptor = Defer<tcp::acceptor>;



void
fail(beast::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}



/*  TCP listening locally   */
asio::awaitable<void> tcp_session(Socket socket)
{
    auto ep = socket.remote_endpoint();
    std::cout << "New TCP session for " << ep << "\n";
    asio::streambuf buf;

    size_t requests_handled = 0;
    for (boost::system::error_code ec; !ec;)
    {
        auto token = redirect_error(asio::deferred, ec);

        auto n = co_await socket.async_read_some(buf.prepare(1024), token);
        buf.commit(n);

        auto eof = ec == asio::error::eof;
        if (buf.size() > 0)
        {
            requests_handled += 1;
            auto n = co_await async_write(socket, buf, token);
            buf.consume(n);
        }
    }

    std::cout << "End of " << ep << " TCP session" << endl;
}


asio::awaitable<void> tcp_listener(tcp::endpoint& ep)
{
    auto ex = co_await this_coro::executor;
    Acceptor acc(ex, ep);
    for (;;)
    {
        co_spawn(ex, tcp_session(co_await acc.async_accept()), asio::detached);
    }
}



/*  HTTPS WebSocket listening on Internet   */
// Echoes back all received WebSocket messages
void do_session(
    websocket::stream<beast::ssl_stream<beast::tcp_stream>>&ws,
    net::yield_context yield)
{
    beast::error_code ec;

    // Set the timeout.
    beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));

    // Perform the SSL handshake
    ws.next_layer().async_handshake(ssl::stream_base::server, yield[ec]);
    if (ec)
    {
        return fail(ec, "handshake");
    }

    // Turn off the timeout on the tcp_stream, because
    // the websocket stream has its own timeout system.
    beast::get_lowest_layer(ws).expires_never();

    // Set suggested timeout settings for the websocket
    ws.set_option(
        websocket::stream_base::timeout::suggested(
            beast::role_type::server));

    // Set a decorator to change the Server of the handshake
    ws.set_option(websocket::stream_base::decorator(
        [](websocket::response_type& res)
        {
            res.set(http::field::server,
            std::string(BOOST_BEAST_VERSION_STRING) +
            " websocket-server-coro-ssl");
        }));

    // Accept the websocket handshake
    ws.async_accept(yield[ec]);
    if (ec)
    {
        return fail(ec, "accept");
    }

    for (;;)
    {
        // This buffer will hold the incoming message
        beast::flat_buffer buffer;

        // Read a message
        ws.async_read(buffer, yield[ec]);

        // This indicates that the session was closed
        if (ec == websocket::error::closed)
        {
            break;
        }

        if (ec)
        {
            return fail(ec, "read");
        }

        // Echo the message back
        ws.text(ws.got_text());
        ws.async_write(buffer.data(), yield[ec]);
        if (ec)
        {
            return fail(ec, "write");
        }
    }
}

void do_listen(
    net::io_context& ioc,
    ssl::context& ctx,
    tcp::endpoint endpoint,
    net::yield_context yield)
{
    beast::error_code ec;

    // Open the acceptor
    tcp::acceptor acceptor(ioc);
    acceptor.open(endpoint.protocol(), ec);
    if (ec)
    {
        return fail(ec, "open");
    }

    // Allow address reuse
    acceptor.set_option(net::socket_base::reuse_address(true), ec);
    if (ec)
    {
        return fail(ec, "set_option");
    }

    // Bind to the server address
    acceptor.bind(endpoint, ec);
    if (ec)
    {
        return fail(ec, "bind");
    }

    // Start listening for connections
    acceptor.listen(net::socket_base::max_listen_connections, ec);
    if (ec)
    {
        return fail(ec, "listen");
    }

    for (;;)
    {
        tcp::socket socket(ioc);
        acceptor.async_accept(socket, yield[ec]);
        if (ec)
        {
            fail(ec, "accept");
        }
        else
        {
            boost::asio::spawn(
                acceptor.get_executor(),
                std::bind(
                    &do_session,
                    websocket::stream<beast::ssl_stream<
                    beast::tcp_stream>>(std::move(socket), ctx),
                    std::placeholders::_1),
                // we ignore the result of the session,
                // most errors are handled with error_code
                boost::asio::detached);
        }
    }
}



int main()
{
    // TCP local
    asio::io_context tcp_ioc(1);
    tcp::endpoint tcp_ep(boost::asio::ip::address::from_string("127.0.0.1"), 9898);
    co_spawn(tcp_ioc, tcp_listener(tcp_ep), asio::detached);
    std::thread t1([&tcp_ioc]() {  tcp_ioc.run(); });
    cout << "TCP listening on port 8989..." << endl;

    // HTTPS WebSocket
    net::io_context wss_ioc;
    tcp::endpoint wss_ep(boost::asio::ip::address::from_string("127.0.0.1"), 8989);
    ssl::context ctx{ssl::context::tlsv12};
    load_server_certificate(ctx);
    // Spawn a listening port
    boost::asio::spawn(wss_ioc,
        std::bind(
            &do_listen,
            std::ref(wss_ioc),
            std::ref(ctx),
            wss_ep,
            std::placeholders::_1),
        // on completion, spawn will call this function
        [](std::exception_ptr ex)
        {
            // if an exception occurred in the coroutine,
            // it's something critical, e.g. out of memory
            // we capture normal errors in the ec
            // so we just rethrow the exception here,
            // which will cause `ioc.run()` to throw
            if (ex)
                std::rethrow_exception(ex);
        });
    std::thread t2([&wss_ioc] { wss_ioc.run(); });
    cout << "HTTPS WebSocket listening on port 8989..." << endl;

    // Wait for threads
    t1.join();
    t2.join();

    return 0;
}

I thought of keeping a global list of connected Sockets(using Socket = Defer<tcp::socket>;) with a unique id for each of the sockets and one global variable for WebSocket stream.

I will be having one thread per each accepted TCP connection and one thread for WebSocket.

By having sockets and websocket stream in global variable I can choose which socket to choose for reading or writing.

Extra information

I haven't open into multiplexing in this question, I just wanted to know is having a global variable that can be shared between different threads is the right way or not.

At this point I wanted to be able to allow TCP threads to be able to have access to WSS stream and vice versa.

I tried following,

using Socket = Defer<tcp::socket>;

typedef struct TCPConn TCPConn_t;
struct TCPConn
{
    Socket* s;
    int unique_id;
};

std::list<TCPConn_t> TCPSockets;

/*  TCP listening locally   */
asio::awaitable<void> tcp_session(Socket socket)
{
    auto ep = socket.remote_endpoint();
    std::cout << "New TCP session for " << ep << "\n";
    asio::streambuf buf;

    TCPConn_t tcpconn;
    tcpconn.s = &socket;
    tcpconn.unique_id = 123213; // something random
    TCPSockets.push_back(tcpconn);
}

I have TCP port listening locally and WSS to be exposed to Internet, the WSS client will initiate the connection and once the is established, the program will be accepting TCP connections and forward them to WSS as they are received from TCP socket. It might also be called reverse websocket tunnel.

I hope this helps to clarify my initial question.

Thanks


Solution

  • It feels a lot like stitched-together code.

    I've reduced your 267 lines into 110, without essentially changing a thing:

    Live On Coliru

    #include "server_certificate.hpp" // Got this from Boost Beast
    #include <boost/asio.hpp>
    #include <boost/asio/spawn.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    #include <boost/beast/websocket/ssl.hpp>
    #include <iostream>
    #include <syncstream>
    #ifdef _MSC_VER
        #pragma comment(lib, "crypt32.lib")
        #pragma comment(lib, "libssl.lib")
        #pragma comment(lib, "libcrypto.lib")
    #endif
    
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using beast::error_code;
    using beast::system_error;
    using net::ip::tcp;
    
    static inline auto out()                                 { return std::osyncstream(std::cout); } 
    static inline void rethrow_handler(std::exception_ptr p) { if (p) std::rethrow_exception(p);   } 
    
    net::awaitable<void> tcp_listen(tcp::endpoint ep) try {
        auto session = [](tcp::socket s) -> net::awaitable<void> {
            auto ep = s.remote_endpoint();
            out() << "New TCP session for " << ep << std::endl;
            net::streambuf buf;
    
            for (error_code ec; !ec;) {
                auto token = redirect_error(net::deferred, ec);
    
                auto n = co_await s.async_read_some(buf.prepare(1024), token);
                buf.commit(n);
    
                if (buf.size() > 0) {
                    auto n = co_await async_write(s, buf, token);
                    buf.consume(n);
                }
            }
    
            out() << "End of " << ep << " TCP session" << std::endl;
        };
    
        auto          ex = co_await net::this_coro::executor;
        tcp::acceptor acc(ex, ep);
        out() << "TCP listening on " << acc.local_endpoint() << std::endl;
        for (;;) {
            co_spawn(ex, session(co_await acc.async_accept(net::deferred)), net::detached);
        }
    } catch (system_error const& se) {
        out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
    }
    
    void wss_listen(tcp::endpoint endpoint, net::yield_context yield) try {
        using ws_t = websocket::stream<beast::ssl_stream<beast::tcp_stream>>;
        using sb_t = websocket::stream_base;
        net::ssl::context ctx{net::ssl::context::tlsv12};
        load_server_certificate(ctx);
    
        auto          ex = yield.get_executor();
        tcp::acceptor acceptor(ex, endpoint);
        acceptor.set_option(net::socket_base::reuse_address(true));
        out() << "HTTPS WebSocket listening on port 8989..." << std::endl;
    
        for (;;) {
            ws_t ws(acceptor.async_accept(make_strand(ex), yield), ctx);
            out() << "New WSS session for " << get_lowest_layer(ws).socket().remote_endpoint() << std::endl;
    
            spawn(
                yield.get_executor(),
                [ws = std::move(ws)](net::yield_context yield) mutable {
                    try {
                        ws.next_layer().async_handshake(net::ssl::stream_base::server, yield);
                        ws.set_option(sb_t::timeout::suggested(beast::role_type::server));
                        ws.set_option(sb_t::decorator([](websocket::response_type& res) {
                            res.set(beast::http::field::server,
                                    std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-coro-ssl");
                        }));
    
                        // Accept the websocket handshake
                        for (ws.async_accept(yield);;) {
                            beast::flat_buffer buffer;
                            ws.async_read(buffer, yield);
    
                            // Echo the message back
                            ws.text(ws.got_text());
                            ws.async_write(buffer.data(), yield);
                        }
                    } catch (system_error const& se) {
                        if (se.code() != websocket::error::closed)
                            out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
                    }
                },
                net::detached);
        }
    } catch (system_error const& se) {
        out() << __FUNCTION__ << " exception: " << se.code().message() << std::endl;
    }
    
    int main() {
        using namespace std::placeholders;
        net::thread_pool ioc(1);
    
        co_spawn(ioc, tcp_listen({{}, 9898}), net::detached);                            // tcp
        net::spawn(ioc, bind(wss_listen, tcp::endpoint{{}, 8989}, _1), rethrow_handler); // wss
    
        ioc.join();
    }
    

    With some proof-of-pudding:

    enter image description here

    Questions

    Q. I thought of keeping a global list of connected sockets[...] with a unique id for each of the sockets and one global variable for WebSocket stream.

    To what end? From your description it sure seems you would not need it.

    Q. I will be having one thread per each accepted TCP connection and one thread for WebSocket.

    Again, from your description it sure seems you would not need it. I suggest you don't complicate if not required.

    Q. By having sockets and websocket stream in global variable I can choose which socket to choose for reading or writing.

    How does that help achieve one of your goals. That said, what is the goal? Currently you have a server that responds to both TCP and WSS connections. From your question start I'd expect you to only listen for TCP connections.

    Q. Is that the correct design for implementing multiple TCP connection to one Websocket stream? It doesn't really feel like it :(

    It doesn't, indeed.

    Q. that's why I hope for some insight into this problem.

    The best insight I have is that you don't know what you want/need. Think it through.

    On the other side

    Take it from there. For example, if you also need to handle/route responses, how will you do that? As you've surmised here you need some multiplexing where metadata tells you/the WS server what responses correlate with which requests. If you cannot describe what you need, you can never implement it.

    It would help a lot to clarify what this is for. E.g. is the WSS server written by you? Does it speak a particular protocol? Which version? Do the TCP clients expect to speak a protocol Etc. etc.