phpzeromqdistributed-computingpyzmqjzmq

Task progress from a ZeroMQ worker


Fairly new to ZeroMQ. I have a simple REQ/REP queue like below. I am using PHP but that doesn't matter as any language binding would be fine for me. This is client to request a task

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
$req->send("Export Data as Zip");
echo $i . ":" . $req->recv().PHP_EOL;

And this is a worker to actually perform the task.

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
echo "Server is started at port $port" . PHP_EOL;
while(true)
{
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    // Do the work here, takes 10 min, knows the count of lines added and remaining
    $srvr->send($msg . " is exported as zip file" . date('H:i:s'));
}

As the task of exporting data takes about 10 min, I want to connect to the server from a different client and get the progress/ percentage of the task done. I am wondering if that's even a valid approach.

I tried this approach where REQ/REP part works but I get nothing in PUB/SUB part

Server part

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");

// add PUB socket to publish progress
$c = new ZMQContext();
$p = new ZMQSocket($c, ZMQ::SOCKET_PUB);
$p->bind("tcp://*:5460");

echo "Server is started at port 5454" . PHP_EOL;
$prog = 0;
while(true)
{
    $p->send($prog++ . '%'); // this part doesn't get to the progress client
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    sleep(2);// some long task
    $srvr->send($msg . " Done zipping " . date('H:i:s'));
}

Progress client

$ctx = new ZMQContext();
$stat = new ZMQSocket($ctx, ZMQ::SOCKET_SUB);
$stat->connect('tcp://localhost:5460');
while (true){
    echo $stat->recv() . PHP_EOL; //nothing shows here
}

Request client

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
for($i=0;$i<100;$i++){
    $req->send("$i : Zip the file please");
    echo $i . ":" . $req->recv().PHP_EOL; //works and get the output
}

Solution

  • The concept is feasible, some tuning needed:

    All PUB counterparties have to setup any non-default subscription, via, at least an empty subscription .setsockopt( ZMQ_SUBSCRIBE, "" ) meaning receive all TOPICs ( none "filter"-ed out ).

    Next, both PUB-side and SUB sides ought get .setsockopt( ZMQ_CONFLATE, 1 ) configured, as there is of no value to populate and feed all intermediate values into the en-queue/de-queue pipeline, once the only value is in the "last", most recent message.

    Always, the non-blocking mode of the ZeroMQ calls ought be preferred ( .recv( ..., flags = ZMQ_NOBLOCK ) et al ) or the Poller.poll() pre-tests ought be used to sniff first for a (non)-presence of a message, before spending more efforts on reading its context "from" ZeroMQ context-manager. Simply put, there are not many cases, where blocking-mode service calls may serve well in a production-grade system.

    Also some further tweaking may help the PUB side, in case a more massive "attack" comes from the un-restricted pool of SUB-side entities and PUB has to create / manage / maintain resources for each of these ( unrestricted ) counterparties.