I'm building an application for a web-based slide show, where one 'master' user can move between slides and everyone's browsers follow along. To do this, I'm using websockets and Redis for a global channel to send messages through. Each client who connects has there info stored in an array, @clients
.
Then I have a separate thread for subscribing to the Redis channel, in which there is an 'on.message' block defined which should send a message to everyone in the @clients
array, but that array is empty inside this block (not empty anywhere else in the module).
Pretty much following this example: https://devcenter.heroku.com/articles/ruby-websockets
The relevant code, which is in a custom middleware class:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts @clients.count
### prints '0,' no clients receive msg
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts @clients.count
### prints actual number of clients
end
ws.on :message do |event|
$redis.publish(CHANNEL, event.data)
end
ws.on :close do |event|
@clients.delete(ws)
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
Is the @clients
array empty when accessed inside the new thread because instance variables aren't shared across threads? and if so, how do I share a variable across threads?
I have also tried using $clients (global variable, should be accessible across threads), to no avail.
UPDATED EDIT AT END: Shows working code. Main module unmodified except for debugging code. Note: I did experience the issue I already noted regarding the need to unsubscribe prior to termination.
The code looks correct. I'd like to see how you are instantiating it.
In config/application.rb, you probably have at least something like:
require 'ws_communication'
config.middleware.use WsCommunication
Then, in your JavaScript client, you should have something like this:
var ws = new WebSocket(uri);
Do you instantiate another instance of WsCommunication? That would set @clients to an empty array and could exhibit your symptoms. Something like this would be incorrect:
var ws = new WsCommunication;
It would help us if you would show the client and, perhaps, config/application.rb if this post does not help.
By the way, I agree with the comment that @clients should be protected by a mutex on any update, if not reads as well. It's a dynamic structure that could change at any time in an event-driven system. redis-mutex is a good option. (Hope that link is correct as Github seems to be throwing 500 errors on everything at the moment.)
You might also note that $redis.publish returns an integer value of the number of clients that received the message.
Finally, you might find that you need to ensure that your channel is unsubscribed before termination. I've had situations where I've ended up sending each message multiple, even many, times because of earlier subscriptions to the same channel that weren't cleaned up. Since you are subscribing to the channel within a thread, you will need to unsubscribe within that same thread or the process will just "hang" waiting for the right thread to magically appear. I handle that situation by setting an "unsubscribe" flag and then sending a message. Then, within the on.message block, I test for the unsubscribe flag and issue the unsubscribe there.
The module you provided, with only minor debugging modifications:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
The test subscriber code I provided:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
The test publisher code I provided. Publisher and Subscriber could easily be combined, as these are just tests:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
A sample config.ru which runs all this at the rack middleware layer:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
This is Main. I stripped it down out of my running version so it might need tweaked if you use it:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end