asynchronousconcurrencyzeromqsingle-threaded

How to Perform Concurrent Request-Reply for Asynchronous Tasks with ZeroMQ?


Intention

I want to allow a client to send a task to some server at a fixed address. The server may take that task and perform it at some arbitrary point in the future, but may still take requests from other clients before then. After performing the task, the server will reply to the client, which may have been running a blocking wait on the reply. The work and clients come dynamically, so there can't be a fixed initial number. The work is done in a non-thread-safe context, so workers can't exist on different threads, so all work should take place in a single thread.

Implementation

The following example1 is not a complete implementation of the server, only a compilable section of the sequence that should be able to take place (but is in reality hanging). Two clients send an integer each, and the server takes one request, then the next request, echo replies to the first request, then echo replies to the second request. The intention isn't to get the responses ordered, only to allow for the holding of multiple requests simultaneously by the server. What actually happens here is that the second worker hangs waiting on the request - this is what confuses me, as DEALER sockets should route outgoing messages in a round-robin strategy.

#include <unistd.h>
#include <stdio.h>
#include <zmq.h>
#include <sys/wait.h>

int client(int num)
{
    void *context, *client;
    int buf[1];

    context = zmq_ctx_new();
    client = zmq_socket(context, ZMQ_REQ);
    zmq_connect(client, "tcp://localhost:5559");
    *buf = num;
    zmq_send(client, buf, 1, 0);
    *buf = 0;
    zmq_recv(client, buf, 1, 0);
    printf("client %d receiving: %d\n", num, *buf);
    zmq_close(client);
    zmq_ctx_destroy(context);
    return 0;
}

void multipart_proxy(void *from, void *to)
{
    zmq_msg_t message;

    while (1) {
        zmq_msg_init(&message);
        zmq_msg_recv(&message, from, 0);
        int more = zmq_msg_more(&message);
        zmq_msg_send(&message, to, more ? ZMQ_SNDMORE : 0);
        zmq_msg_close(&message);
        if (!more) break;
    }

}

int main(void)
{
    int status;
    if (fork() == 0) {
        client(1);
        return(0);
    }
    if (fork() == 0) {
        client(2);
        return 0;
    }
    /* SERVER */
    void *context, *frontend, *backend, *worker1, *worker2;
    int wbuf1[1], wbuf2[1];

    context = zmq_ctx_new();
    frontend = zmq_socket(context, ZMQ_ROUTER);
    backend = zmq_socket(context, ZMQ_DEALER);
    zmq_bind(frontend, "tcp://*:5559");
    zmq_bind(backend, "inproc://workers");

    worker1 = zmq_socket(context, ZMQ_REP);
    zmq_connect(worker1, "inproc://workers");
    multipart_proxy(frontend, backend);
    *wbuf1 = 0;
    zmq_recv(worker1, wbuf1, 1, 0);
    printf("worker1 receiving: %d\n", *wbuf1);

    worker2 = zmq_socket(context, ZMQ_REP);
    zmq_connect(worker2, "inproc://workers");
    multipart_proxy(frontend, backend);
    *wbuf2 = 0;
    zmq_recv(worker2, wbuf2, 1, 0);
    printf("worker2 receiving: %d\n", *wbuf2);

    zmq_send(worker1, wbuf1, 1, 0);
    multipart_proxy(backend, frontend);

    zmq_send(worker2, wbuf2, 1, 0);
    multipart_proxy(backend, frontend);

    wait(&status);
    zmq_close(frontend);
    zmq_close(backend);
    zmq_close(worker1);
    zmq_close(worker2);
    zmq_ctx_destroy(context);
    return 0;
}

Other Options

I have looked at CLIENT and SERVER sockets and they appear to be capable on paper, however in practice they're sufficiently new that the system version of ZeroMQ that I have doesn't yet support them. If it is not possible to perform this in ZeroMQ, any alternative suggestions are very welcome.


1 Based on the Shared Queue section of the ZeroMQ guide.


Solution

  • Let me share a view on how ZeroMQ could meet the above defined Intention.

    Let's rather use ZeroMQ Scalable Formal Communication Pattern Archetypes ( as they are RTO now, not as we may wish them to be at some, yet unsure, point in (a just potentially happening) future evolution state ).

    We need not hesitate to use many more ZeroMQ-based connections among a herd of coming/leaving client-instance(s) and the server

    For example :


    Client .connect()-s a REQ-socket to Server-address:port-A to ask for a "job"-ticket processing over this connection

    Client .connect()-s a SUB-socket to Server-address:port-B to listen ( if present ) about published announcements about already completed "job"-tickets that are Server-ready to deliver results for

    Client exposes another REQ-socket to request upon an already broadcast "job"-ticket completion announcement message, it has just heard about over the SUB-socket, to get "job"-ticket results finally delivered, if proving itself, by providing a proper / matching job-ticket-AUTH-key to proof its right to receive the publicly announced results' availability, using this same socket to deliver a POSACK-message to Server upon client has correctly received this "job"-ticket results "in hands"


    Server exposes REP-socket to respond each client ad-hoc upon a "job"-ticket request, notifying this way about having "accepted"-job-ticket, delivering also a job-ticket-AUTH-key for later pickup of results

    Server exposes PUB-socket to announce any and all not yet picked-up "finished"-job-tickets

    Server exposes another REP-socket to receive any possible attempt to request to deliver "job"-ticket-results. Upon verifying there delivered job-ticket-AUTH-key, Server decides whether the respective REQ-message had matching job-ticket-AUTH-key to indeed deliver a proper message with results, or whether a match did not happen, in which case a message will carry some other payload data ( logic is left for further thoughts, so as to prevent potential bruteforcing or eavesdropping and similar, less primitive attacks on stealing the results )


    Clients need not stay waiting for results live/online and/or can survive certain amounts of LoS, L2/L3-errors or network-storm stresses

    Clients need just to keep some kind of job-ticket-ID and job-ticket-AUTH-key for later retrieving of the Server-processes/maintained/auth-ed results

    Server will keep listening for new jobs

    Server will accept new job-tickets with providing a privately added job-ticket-AUTH-key

    Server will process job-tickets as it will to do so

    Server will maintain a circular-buffer of completed job-tickets to be announced

    Server will announce, in due time and repeated as decided in public, job-tickets, that are ready for client-initiated retrieval

    Server will accept new retrieval requests

    Server will verify client-requests for matching any announced job-ticket-ID and testing if job-ticket-AUTH-key match either

    Server will respond to either matching / non-matching job-ticket-ID results retrieval request(s)

    Server will remove a job-ticket-ID from a circular-buffer only upon both POSACK-ed AUTH-match before a retrieval and a POSACK-message re-confirmed delivery to client