rubymultithreadingwebsocketeventmachine

faye-websocket holding until block finishes


When I send messages via a faye-websocket server as middleware, I find the message doesn't send until after the block finishes.

Here's a few examples I've tried:

No threading

require 'faye/websocket'
require 'eventmachine'
require 'json'

Faye::WebSocket.load_adapter('thin')

module SocketTest
  class Websocket

    def initialize(app)
      @app     = app
    end

    def long_function()
      sleep 20
      "foo"
    end    

    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            long_function_result = long_function()
            response = {
              :responseCode => 200,
              :message => "Long function ran, result is #{long_function_result}"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            ws.close()
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    
  end
end

Chrome output

5:25:09pm   WebSocket Connection Established
5:25:09pm   {"request":"test"}
5:25:09pm   {"responseCode":100,"message":"Connection opened"}
5:25:30pm   {"responseCode":100,"message":"Received request, running slow function"}
5:25:30pm   {"responseCode":200,"message":"Long function ran, result is foo"}
5:25:30pm   Connection Close Frame
5:25:30pm   Connection Close Frame

Console output

I, [2020-01-15T17:25:10.006006 #25394]  INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:25:10.024170 #25394]  INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:25:30.034885 #25394]  INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}I, [2020-01-15T06:25:30.189606 #25394]  INFO -- : < CLOSE

As we can see above, the logger output is is immediate (17:25:09) for "Received request, running slow function", however the additional 20 seconds are taken for the response to be sent to the client. I've noticed the same thing with PING/PONG messages - nothing comes from the WebSocket server until the blocking sleep finishes.

I've also tried a modified version like this:

Threading off

# Just the call function, all other parts are the same
    def call(env)
      EM.run {
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: 15 })
          ws.on :open do |event|
            response = {
              :responseCode => 100,
              :message => "Connection opened"
            }
            $logger.info "< #{response.to_json}"
            ws.send(response.to_json)
          end

          ws.on :message do |event|
            response = {
              :responseCode => 100,
              :message => "Received request, running slow function"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            long_function = Thread.new { long_function() }
            until (long_function.alive? == false) do
              response = {
                :responseCode => 100,
                :message => "Waiting for long function to complete"
              }
              ws.send(response.to_json)
              $logger.info "< #{response.to_json}"
              sleep 5

            end
            response = {
              :responseCode => 200,
              :message => "Long function ran, result is #{long_function.value}"
            }
            ws.send(response.to_json)
            $logger.info "< #{response.to_json}"
            ws.close()
          end
          ws.on :close do |event|
            $logger.info "< CLOSE"
            ws = nil
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end
      }
    end    

Chrome output

5:34:33pm   WebSocket Connection Established
5:34:33pm   {"request":"test"}
5:34:33pm   {"responseCode":100,"message":"Connection opened"}
5:34:53pm   {"responseCode":100,"message":"Received request, running slow function"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":100,"message":"Waiting for long function to complete"}
5:34:53pm   {"responseCode":200,"message":"Long function ran, result is foo"}
5:34:53pm   Connection Close Frame
5:34:53pm   Connection Close Frame

Console output

I, [2020-01-15T17:34:33.473295 #25729]  INFO -- : < {"responseCode":100,"message":"Connection opened"}
I, [2020-01-15T17:34:33.489433 #25729]  INFO -- : < {"responseCode":100,"message":"Received request, running slow function"}
I, [2020-01-15T17:34:33.489638 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:38.490995 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:43.491927 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:48.497281 #25729]  INFO -- : < {"responseCode":100,"message":"Waiting for long function to complete"}
I, [2020-01-15T17:34:53.499446 #25729]  INFO -- : < {"responseCode":200,"message":"Long function ran, result is foo"}
I, [2020-01-15T17:34:53.632997 #25729]  INFO -- : < CLOSE

The result is the same again, although the $logger object continues to operate whilst waiting for the thread result (indicating to me we are no longer blocking), it appears the WebSocket is still holding and waiting for its buffer. How can I force this buffer out?


Solution

  • Using the add_timer EventMachine function that @max_pleaner suggested I was able to build a working version of the code with internal callbacks to the function to create a loop:

    require 'faye/websocket'
    require 'eventmachine'
    require 'json'
    
    Faye::WebSocket.load_adapter('thin')
    
    module SocketTest
      class Websocket
    
        def initialize(app)
          @app     = app
        end
    
        def call(env)
          EM.run {
            if Faye::WebSocket.websocket?(env)
              ws = Faye::WebSocket.new(env, nil, {ping: 15 })
              ws.on :open do |event|
                response = {
                  :responseCode => 100,
                  :message => "Connection opened"
                }
                $logger.info "< #{response.to_json}"
                ws.send(response.to_json)
              end
    
              ws.on :message do |event|
                response = {
                  :responseCode => 100,
                  :message => "Received request, running slow function"
                }
                ws.send(response.to_json)
                $logger.info "< #{response.to_json}"
                total_runs = 4
                def long_function(ws, count, total_runs)
                  if count > total_runs then
                    # Error out
                    puts "Reached full count, exiting"
                    ws.close()
                    return
                  end
                  # Logic here
                  if count == 3 then
                    response = {
                      :responseCode => 200,
                      :message => "Long function ran"
                    }
                    ws.send(response.to_json)
                    $logger.info "< #{response.to_json}"
                    ws.close()
                  else
                    response = {
                      :responseCode => 100,
                      :message => "Waiting for long function to complete"
                    }
                    ws.send(response.to_json)
                    $logger.info "< #{response.to_json}"
                    EventMachine.add_timer(5) {
                      long_function(ws, count + 1, total_runs)
                    }
                  end
                end
                long_function(ws, 1, total_runs)
              end
              ws.on :close do |event|
                $logger.info "< CLOSE"
                ws = nil
              end
    
              # Return async Rack response
              ws.rack_response
    
            else
              @app.call(env)
            end
          }
        end    
      end
    end