rubywebsockethaproxyfaye

Faye Websocket Ruby not working as expected


I am trying to use haproxy to load balance my websocket rack application.

I publish message in channel rates using redis-cli and this succeeds puts "sent" if ws.send(msg)

The client does receive the 'Welcome! from server' message so I know the initial handshake is done.

But, the client never receives the published message in channel 'rates'.

web_socket.rb

require 'faye/websocket'

module WebSocket
 class App
   KEEPALIVE_TIME = 15 # in seconds

def initialize(app)
  @app     = app
  @mutex = Mutex.new
  @clients = []
  # @redis   = Redis.new(host: 'rds', port: 6739)
  Thread.new do
    @redis_sub = Redis.new(host: 'rds', port: 6379)
    @redis_sub.subscribe('rates') do |on|
      on.message do |channel, msg|
        p [msg,@clients.length]
        @mutex.synchronize do
          @clients.each do |ws|
            # ws.ping 'Mic check, one, two' do
            p ws
            puts "sent" if ws.send(msg)
            # end
          end
        end
      end
    end
  end
end

def call(env)
  if Faye::WebSocket.websocket?(env)
    # WebSockets logic goes here
    ws = Faye::WebSocket.new(env, nil) # {ping: KEEPALIVE_TIME }
    ws.on :open do |event|
      p [:open,  ENV['APPID'], ws.object_id]
      ws.ping 'Mic check, one, two' do
        # fires when pong is received
        puts "Welcome sent" if ws.send('Welcome! from server')
        @mutex.synchronize do
          @clients << ws
        end
        p [@clients.length, ' Client Connected']
      end

    end
    ws.on :close do |event|
      p [:close, ENV['APPID'], ws.object_id, event.code, event.reason]
      @mutex.synchronize do
        @clients.delete(ws)
      end
      p @clients.length
      ws = nil
    end
    ws.on :message do |event|
      p [:message, event.data]
      # @clients.each {|client| client.send(event.data) }
    end
    # Return async Rack response
    ws.rack_response
  else
    @app.call(env)
  end
  end
  end
  end

My haproxy.cfg

frontend http
  bind *:8080
  mode http
  timeout client 1000s
  use_backend all
backend all
  mode http
  timeout server 1000s
  timeout tunnel 1000s
  timeout connect 1000s
  server s1 app1:8080
  server s2 app2:8080
  server s3 app3:8080
  server s4 app4:8080

Chrome dev tools enter image description here

Please help me!!!

EDIT: I have tried Thread running in Middleware is using old version of parent's instance variable but this does not work.

As mentioned earlier. The below code succeeds

puts "sent" if ws.send(msg)


Solution

  • Okay, After a lot of searching and testing.. I found that the issue was with not setting a ping. during websocket initialization in the server.

    Change this

    ws = Faye::WebSocket.new(env, nil) # {ping: KEEPALIVE_TIME }

    to

    ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })

    My KEEPALIVE_TIME is 0.5 because I am making a stock application where rates change very quickly. You can keep it per your needs.