When I send messages via a faye-websocket server as middleware, I find the message doesn't send until after the block finishes.
Here's a few examples I've tried:
require 'faye/websocket'
require 'eventmachine'
require 'json'
Faye::WebSocket.load_adapter('thin')
module SocketTest
class Websocket
def initialize(app)
@app = app
end
def long_function()
sleep 20
"foo"
end
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
long_function_result = long_function()
response = {
:responseCode => 200,
:message => "Long function ran, result is #{long_function_result}"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
end
end
Chrome output
5:25:09pm WebSocket Connection Established
5:25:09pm {"request":"test"}
5:25:09pm {"responseCode":100,"message":"Connection opened"}
5:25:30pm {"responseCode":100,"message":"Received request, running slow function"}
5:25:30pm {"responseCode":200,"message":"Long function ran, result is foo"}
5:25:30pm Connection Close Frame
5:25:30pm Connection Close Frame
Console output
I, [2020-01-15T17:25:10.006006 #25394] INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:25:10.024170 #25394] INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:25:30.034885 #25394] INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}I, [2020-01-15T06:25:30.189606 #25394] INFO -- : < CLOSE
As we can see above, the logger output is is immediate (17:25:09) for "Received request, running slow function", however the additional 20 seconds are taken for the response to be sent to the client. I've noticed the same thing with PING/PONG messages - nothing comes from the WebSocket server until the blocking sleep
finishes.
I've also tried a modified version like this:
# Just the call function, all other parts are the same
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
long_function = Thread.new { long_function() }
until (long_function.alive? == false) do
response = {
:responseCode => 100,
:message => "Waiting for long function to complete"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
sleep 5
end
response = {
:responseCode => 200,
:message => "Long function ran, result is #{long_function.value}"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
Chrome output
5:34:33pm WebSocket Connection Established
5:34:33pm {"request":"test"}
5:34:33pm {"responseCode":100,"message":"Connection opened"}
5:34:53pm {"responseCode":100,"message":"Received request, running slow function"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm {"responseCode":200,"message":"Long function ran, result is foo"}
5:34:53pm Connection Close Frame
5:34:53pm Connection Close Frame
Console output
I, [2020-01-15T17:34:33.473295 #25729] INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:34:33.489433 #25729] INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:34:33.489638 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:38.490995 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:43.491927 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:48.497281 #25729] INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:53.499446 #25729] INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}
I, [2020-01-15T17:34:53.632997 #25729] INFO -- : < CLOSE
The result is the same again, although the $logger
object continues to operate whilst waiting for the thread result (indicating to me we are no longer blocking), it appears the WebSocket is still holding and waiting for its buffer. How can I force this buffer out?
Using the add_timer EventMachine function that @max_pleaner suggested I was able to build a working version of the code with internal callbacks to the function to create a loop:
require 'faye/websocket'
require 'eventmachine'
require 'json'
Faye::WebSocket.load_adapter('thin')
module SocketTest
class Websocket
def initialize(app)
@app = app
end
def call(env)
EM.run {
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: 15 })
ws.on :open do |event|
response = {
:responseCode => 100,
:message => "Connection opened"
}
$logger.info "< #{response.to_json}"
ws.send(response.to_json)
end
ws.on :message do |event|
response = {
:responseCode => 100,
:message => "Received request, running slow function"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
total_runs = 4
def long_function(ws, count, total_runs)
if count > total_runs then
# Error out
puts "Reached full count, exiting"
ws.close()
return
end
# Logic here
if count == 3 then
response = {
:responseCode => 200,
:message => "Long function ran"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
ws.close()
else
response = {
:responseCode => 100,
:message => "Waiting for long function to complete"
}
ws.send(response.to_json)
$logger.info "< #{response.to_json}"
EventMachine.add_timer(5) {
long_function(ws, count + 1, total_runs)
}
end
end
long_function(ws, 1, total_runs)
end
ws.on :close do |event|
$logger.info "< CLOSE"
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
}
end
end
end