cmultithreadinglibwebsockets

Receiving Websocket data on separate threads with libwebsocket


I'm trying to set up two websocket connections that receive on different threads. I'm aware that only one lws_context should exist, but on https://libwebsockets.org/, they say that you can have n event loops running on n different threads, although I've struggled to find any example code to help me out with this.

To get around this, I've set the code up to run any parsing on a separate thread. The issue with this is that it isn't optimal, as I assume time is lost in the event loop when deciding which callback function is called. What can I look into as an option here? I've had a read of a lot of the Readme's that they provide, but I haven't been able to find anything to help.

Below is a minimal reproducible example.

#include <stdio.h>
#include <stdlib.h>
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>

static struct lws *fclient_wsi;
static struct lws *client_wsi;
pthread_t* threads;
static int interrupted = 0;
static volatile bool ready1 = false;
static volatile bool ready2 = false;
static int counter = 0;

// Use this to get parse times
double get_time()
{
    LARGE_INTEGER t, f;
    QueryPerformanceCounter(&t);
    QueryPerformanceFrequency(&f);
    return (double)t.QuadPart/(double)f.QuadPart;
}
static void* AwaitParse(void* data){
    bool* ready = (bool *) data;
    while(!interrupted){
        if(!*ready)
            continue;
        double done = get_time();
        printf("Recieved at %f\n", done);

        *ready = false;
    }
}
static int ParseData1(struct lws *wsi, enum lws_callback_reasons reason,
                      void *user, void *in, size_t len)
{
    int res;
    switch (reason) {
        case LWS_CALLBACK_CLIENT_RECEIVE:
            if(!ready1){
                // Do something here
                counter++;
                if(counter > 10)
                    interrupted = 1;
                ready1 = true;
                lwsl_user("Data: %s\n", (const char*)in);
            }
            break;
        case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
            lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
                     in ? (char *)in : "(null)");
            client_wsi = NULL;
            break;

        case LWS_CALLBACK_CLIENT_ESTABLISHED:
            lwsl_user("%s: established\n", __func__);
            break;


        case LWS_CALLBACK_CLIENT_CLOSED:
            client_wsi = NULL;
            break;

        default:
            break;
    }

    return lws_callback_http_dummy(wsi, reason, user, in, len);
}
static int ParseData2(struct lws *wsi, enum lws_callback_reasons reason,
                      void *user, void *in, size_t len)
{
    int res;
    switch (reason) {
        case LWS_CALLBACK_CLIENT_RECEIVE:
            if(!ready2) {
                ready2 = true;
                lwsl_user("Data: %s\n", (const char*)in);
            }
            break;
        case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
            lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
                     in ? (char *)in : "(null)");
            client_wsi = NULL;
            break;

        case LWS_CALLBACK_CLIENT_ESTABLISHED:
            lwsl_user("%s: established\n", __func__);
            break;


        case LWS_CALLBACK_CLIENT_CLOSED:
            client_wsi = NULL;
            break;

        default:
            break;
    }

    return lws_callback_http_dummy(wsi, reason, user, in, len);
}
static const struct lws_extension extensions[] = {
        {
                "permessage-deflate",
                lws_extension_callback_pm_deflate,
                "permessage-deflate"
                "; client_no_context_takeover"
                "; client_max_window_bits"
        },
        { NULL, NULL, NULL /* terminator */ }
};

static const struct lws_protocols protocols[] = {
        {
                "data1-ws",
                ParseData1,
                      0,
                         0
        },
        {
                "data2-ws",
                ParseData2,
                      0,
                         0,
        },
        { NULL, NULL, 0, 0 }
};

static void sigint_handler(int sig)
{
    interrupted = 1;
}
int main() {
    threads = malloc(2 * sizeof(pthread_t));
    struct lws_context_creation_info info;
    struct lws_client_connect_info i;
    struct lws_context *context;
    int n = 0, logs = LLL_USER | LLL_WARN;

    signal(SIGINT, sigint_handler);

    lws_set_log_level(logs, NULL);
    memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
    info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
    info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
    info.protocols = protocols;
    info.extensions = extensions;
    info.fd_limit_per_thread = 1 + 1 + 1 + 1;
    context = lws_create_context(&info);
    if (!context) {
        lwsl_err("Private WS init failed\n");
        return -1;
    }
    for(int j = 0; j < 2; j++){
        memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
        i.context = context;
        i.port = 443;
        i.address = "fstream.binance.com";
        i.path = "/ws/btcusdt@bookTicker";
        i.host = i.address;
        i.origin = i.address;
        i.ssl_connection = LCCSCF_USE_SSL;
        i.protocol = protocols[j].name;
        i.pwsi = (j == 0 ? &fclient_wsi : &client_wsi);
        lws_client_connect_via_info(&i);
    }

    pthread_create(&threads[0], NULL, AwaitParse, &ready1);
    pthread_create(&threads[1], NULL, AwaitParse, &ready2);

    while (n >= 0 && !interrupted)
        n = lws_service(context, 0);
    
    pthread_join(threads[0], NULL);
    pthread_join(threads[1], NULL);

    lws_context_destroy(context);
    lwsl_user("Completed Combined Streams\n");
    return 0;
}

What I would expect is that the times output would look very similar, however there is a regular difference in parsing times of 0.2ms, which is quite a significant delay. I printed the output as well to ensure that the same message is being received, and this is the case for most (but not all) of the responses.


Solution

  • In your code, you are connecting two clients via lws_client_connect_via_info, each one with its callback, ParseData1 and ParseData2, that inform the waiting threads about the availability of a payload using the ready1 and ready2 flags.

    If I understand correctly your question: I think there is no reason to expect the exact same temporal behavior from two separate connections: they can act differently depending upon the network status. The time when a message comes to your callback routine is not guaranteed to be synchronized with the same time on a different connection.

    I run your code on my setup and found that the timing and the number of replies on the two connections can change dramatically between runs. E.g. in the following the outputs of the AwaitParsex routines for a program run:

    Received 1 at 1719405796014.936035
    Received 2 at 1719405796014.952881
    Received 2 at 1719405796197.645020
    Received 2 at 1719405796201.722900
    Received 1 at 1719405796326.232910
    Received 2 at 1719405796326.247070
    Received 1 at 1719405796326.260986
    Received 2 at 1719405796326.272949
    Received 1 at 1719405796326.285889
    Received 1 at 1719405796326.301025
    

    The parsing times are sometimes very close, and sometimes far apart, anyway a far different situation from the regular 0.2ms you found. From my runs, it seems that the decision about which callback to use is not using much CPU time, stated that different callbacks are spaced as low as 10 microseconds.

    If you need to maximize the parse throughput, you may create a pool of threads linked to the same connection, and/or increase the number of connections.