I am trying to use haproxy to load balance my websocket rack application.
I publish message in channel rates
using redis-cli and this succeeds puts "sent" if ws.send(msg)
The client does receive the 'Welcome! from server'
message so I know the initial handshake is done.
But, the client never receives the published message in channel 'rates'.
web_socket.rb
require 'faye/websocket'
module WebSocket
class App
KEEPALIVE_TIME = 15 # in seconds
def initialize(app)
@app = app
@mutex = Mutex.new
@clients = []
# @redis = Redis.new(host: 'rds', port: 6739)
Thread.new do
@redis_sub = Redis.new(host: 'rds', port: 6379)
@redis_sub.subscribe('rates') do |on|
on.message do |channel, msg|
p [msg,@clients.length]
@mutex.synchronize do
@clients.each do |ws|
# ws.ping 'Mic check, one, two' do
p ws
puts "sent" if ws.send(msg)
# end
end
end
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
# WebSockets logic goes here
ws = Faye::WebSocket.new(env, nil) # {ping: KEEPALIVE_TIME }
ws.on :open do |event|
p [:open, ENV['APPID'], ws.object_id]
ws.ping 'Mic check, one, two' do
# fires when pong is received
puts "Welcome sent" if ws.send('Welcome! from server')
@mutex.synchronize do
@clients << ws
end
p [@clients.length, ' Client Connected']
end
end
ws.on :close do |event|
p [:close, ENV['APPID'], ws.object_id, event.code, event.reason]
@mutex.synchronize do
@clients.delete(ws)
end
p @clients.length
ws = nil
end
ws.on :message do |event|
p [:message, event.data]
# @clients.each {|client| client.send(event.data) }
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
end
end
end
My haproxy.cfg
frontend http
bind *:8080
mode http
timeout client 1000s
use_backend all
backend all
mode http
timeout server 1000s
timeout tunnel 1000s
timeout connect 1000s
server s1 app1:8080
server s2 app2:8080
server s3 app3:8080
server s4 app4:8080
Please help me!!!
EDIT: I have tried Thread running in Middleware is using old version of parent's instance variable but this does not work.
As mentioned earlier. The below code succeeds
puts "sent" if ws.send(msg)
Okay, After a lot of searching and testing.. I found that the issue was with not setting a ping. during websocket initialization in the server.
Change this
ws = Faye::WebSocket.new(env, nil) # {ping: KEEPALIVE_TIME }
to
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
My KEEPALIVE_TIME is 0.5 because I am making a stock application where rates change very quickly. You can keep it per your needs.