I am trying to implement a feedback in a web page that let the user start a long process from an Excel sheet (sight, yes...). For each line of the data, the processing time is about 1 second, and the common data length is between 40 and 100 items, so the overall processing time can be greater than a minute.
I am displaying a preview of the data in the page, starting the process through a websocket and would like to show a progression from the same websocket.
The processing in itself is made by an external package and the page complexity is minimal, so I have wrapped it in a Lite
single file.
My problem is that the long processing started in the websocket route is blocking the feedback until it has finished and all the progession events are sent at the same time at the end. For what I understand, it is related to the event loop of Mojolicious and I should start the processing separately to avoid freezing the processing of websocket.
Note that I have tried a separate channel of feedback with an EventSource
to push some progession to the client during the processing, but it shows the same completion at once at the end.
Here is my code simplified, I am using a sleep()
to simulate the long process. I am starting with
perl mojo_notify_ws.pl daemon
Could you suggest how to modify the websocket route to allow a real time feedback?
use Mojolicious::Lite;
use Mojo::JSON qw(encode_json decode_json j);
use Data::Dumper;
$|++;
any '/' => sub {
my $c = shift;
$c->render('index');
};
my $peer;
websocket '/go' => sub {
use Carp::Always;
my $ws = shift;
$peer = $ws->tx;
app->log->debug(sprintf 'Client connected: %s', Dumper $peer->remote_address);
# do not subscribe to 'text' else 'json' won't work
#$ws->on(text => sub {
# my ($ws, $msg) = @_;
# app->log->debug("Received text from websocket: `$msg`");
# });
# $peer->send('{"type": "test"}');
# say 'default inactivity timeout='. (p $ws->inactivity_timeout());
$ws->inactivity_timeout(120);
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');
# simulate
my $loop = Mojo::IOLoop->singleton;
# $loop->subprocess( sub {
# my $sp = shift;
for my $cell (1..3) {
# $loop->delay( sub {
app->log->debug("sending cell $cell");
my $payload = {
type => 'ticket',
cell => $cell,
result => $cell % 2 ? 'OK' : 'NOK'
};
$ws->send( { json => $payload } );
sleep(2);
# $loop->timer(2, sub {say 'we have waited 2 secs!';})->wait;
# });
};
# }, sub {} );#subprocess
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
});
$ws->on(finish => sub {
my ($ws, $code, $reason) = @_;
$reason = '' unless defined $reason;
app->log->debug("Client disconnected: $code ($reason)");
});
app->log->debug('Reached end of ws route definition');
};
app->start;
__DATA__
@@ index.html.ep
<html>
<head>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.js"></script>
<script>
var timerID = 0;
function keepAlive(ws) {
var timeout = 20000;
if (ws.readyState == ws.OPEN) {
ws.send('ping');
}
timerId = setTimeout(function(){keepAlive(ws);}, timeout);
}
function cancelKeepAlive() {
if (timerId) {
clearTimeout(timerId);
}
}
function flagCell(cell, result){
var id='#CELL_' + cell;
var cell = $(id);
if(cell) {
if (result=='OK') {
cell.css('color', 'green');
cell.text('⯲');
} else {
cell.css('color','red');
cell.text('✘');
}
}
}
function process(){
//debugger;
console.log('Opening WebSocket');
var ws = new WebSocket('<%= url_for('go')->to_abs %>');
ws.onopen = function (){
console.log('Websocket Open');
//keepAlive(ws);
ws.send(JSON.stringify({cmd: "let's go Perl"}));
};
//incoming
ws.onmessage = function(evt){
var data = JSON.parse(evt.data);
console.log('WS received '+JSON.stringify(data));
if (data.type == 'ticket') {
console.log('Server has send a status');
console.log('Cell:'+data.cell + ' res:' + data.result);
flagCell(data.cell, data.result);
} else if (data.type == 'end') {
console.log('Server has finished.');
//cancelKeepAlive();
ws.close();
} else {
console.log('Unknown message:' + evt.data);
}
};
ws.onerror = function (evt) {
console.log('ws error:', evt.data);
}
ws.onclose = function (evt) {
if(evt.wasClean) {
console.log('Connection closed cleanly');
} else {
console.log('Connection reseted');
}
console.log('Code:'+ evt.code + ' Reason:' + evt.reason);
}
}
</script>
</head>
<body>
<button type=button id='upload' onclick="process();">Go</button><br>
<div style='font-family:sans;'>
<table border="1px">
<tr><td id="CELL_1"> </td><td>Foo</td></tr>
<tr><td id="CELL_2"> </td><td>Bar</td></tr>
<tr><td id="CELL_3"> </td><td>Baz</td></tr>
</table>
</div>
</body>
</html>
EDIT:
Grinnz has provided a suitable solution, but for the record, here is my attempt with Mojo::IOLoop::Subprocess
callback but then I have no feedback at all. I am running on Linux and Subprocess
seems to fork, and the the parent process seems to terminates the websocket immediately edit: no: I eventually found that the $ws->send()
is at the wrong place as it should be placed in the second sub{}
that is run in the parent side, and not in the first that is run in the child process. This code should be refactored to have one subprocess
per loop iteration and a final step for the notification of end.
Here is the modified on(json)
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');
# my $loop = Mojo::IOLoop->singleton;
my $subprocess = Mojo::IOLoop::Subprocess->new;
app->log->debug("we are pid $$");
$subprocess->run(
sub {
my $sp = shift;
for my $cell (1..3) {
app->log->debug("starting process for cell $cell in pid $$");
sleep(2);
app->log->debug("sending cell $cell to ws");
my $payload = {
type => 'ticket',
cell => $cell,
result => $cell % 2 ? 'OK' : 'NOK'
};
$ws->send( { json => $payload } ); # FIXME: actually this line is in the wrong place
# and should be in the second sub{}
};
},
sub {
my ($sp, $err, @results) = @_;
$ws->reply->exception($err) and return if $err;
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
});
# Start event loop if necessary
$subprocess->ioloop->start unless $subprocess->ioloop->is_running;
});
And the corresponding log:
[Wed Oct 3 19:51:58 2018] [debug] Received: `let's go Perl`
[Wed Oct 3 19:51:58 2018] [debug] we are pid 8898
[Wed Oct 3 19:51:58 2018] [debug] Client disconnected: 1006 ()
[Wed Oct 3 19:51:58 2018] [debug] starting process for cell 1 in pid 8915
[Wed Oct 3 19:52:00 2018] [debug] sending cell 1 to ws
[Wed Oct 3 19:52:00 2018] [debug] starting process for cell 2 in pid 8915
[Wed Oct 3 19:52:02 2018] [debug] sending cell 2 to ws
[Wed Oct 3 19:52:02 2018] [debug] starting process for cell 3 in pid 8915
[Wed Oct 3 19:52:04 2018] [debug] sending cell 3 to ws
[Wed Oct 3 19:52:04 2018] [debug] sending end of process ->websocket
[Wed Oct 3 19:52:04 2018] [debug] Client disconnected: 1005 ()
I also experimented with Mojo::IOLoop->delay
to generate a complicated sequence of steps in a way similar to the Promise
solution, but this one is sending also all the notifications at once at the end:
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');
app->log->debug("we are pid $$");
my @steps;
for my $cell (1..3) {
push @steps,
sub {
app->log->debug("subprocess for cell pid $cell");
# my $sp = shift;
my $delay = shift;
sleep(2);
app->log->debug("end of sleep for cell $cell");
$delay->pass($cell % 2 ? 'OK' : 'NOK');
},
sub {
my $delay = shift;
my $result = shift;
app->log->debug("sending cell $cell from pid $$ - result was $result");
my $payload = {
type => 'ticket',
cell => $cell,
result => $result
};
$ws->send( { json => $payload } );
$delay->pass;
};
}
# add final step to notify end of processing
push @steps, sub {
my $delay = shift;
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
$delay->pass;
};
my $delay = Mojo::IOLoop::Delay->new;
app->log->debug("Starting delay...");
$delay->steps( @steps );
app->log->debug("After the delay");
});
It is not possible to magically make Perl code non-blocking. That's why your blocking operation is holding up the websocket responses and event loop.
A single subprocess will not work for this, because only the original worker process that handled the request can respond to the websocket, and subprocesses can only return once. You can, however, use a subprocess to prepare each response you want to send. Your use of subprocesses is not quite correct however.
The first subroutine passed to the subprocess executes in a fork and thus doesn't block the main process. The second subroutine executes in the parent once the subprocess completes, and receives the return value of the first subroutine. This is where you need to send your responses.
Any code outside of that will be executed before the subprocess is even started, because this is asynchronous code, you need to sequence the logic via callbacks. You can use promises to make complicated sequencing simpler.
use Mojo::Promise;
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');
my $promise = Mojo::Promise->new->resolve; # starting point
# attach follow-up code for each cell, returning a new promise representing the whole chain so far
for my $cell (1..3) {
$promise = $promise->then(sub {
my $promise = Mojo::Promise->new;
Mojo::IOLoop->subprocess(sub {
app->log->debug("sending cell $cell");
sleep(2);
my $payload = {
type => 'ticket',
cell => $cell,
result => $cell % 2 ? 'OK' : 'NOK'
};
return $payload;
}, sub {
my ($sp, $err, $payload) = @_;
return $promise->reject($err) if $err; # indicates subprocess died
$ws->send( { json => $payload }, sub { $promise->resolve } );
});
# here, the subprocess has not been started yet
# it will be started when this handler returns to the event loop
# then the second callback will run once the subprocess exits
return $promise;
};
}
# chain from last promise
$promise->then(sub {
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
})->catch(sub {
my $err = shift;
# you can send or log something here to indicate an error occurred in one of the subprocesses
});
});
Some other options I can go into more detail if they would be appropriate: Mojo::IOLoop::ReadWriteFork which would let you start just one subprocess and continuously receive STDOUT from it (you would need to serialize your payload yourself to send it on STDOUT, like with Mojo::JSON); or a regular subprocess that sends status information back to the parent over an external pub/sub broker that both processes can connect to, like Postgres, Redis, or Mercury (also would require serialization).