ruby-on-railsmultithreadingredisem-websocket

Accessing a variable within a rails thread


I'm building an application for a web-based slide show, where one 'master' user can move between slides and everyone's browsers follow along. To do this, I'm using websockets and Redis for a global channel to send messages through. Each client who connects has there info stored in an array, @clients. Then I have a separate thread for subscribing to the Redis channel, in which there is an 'on.message' block defined which should send a message to everyone in the @clients array, but that array is empty inside this block (not empty anywhere else in the module).

Pretty much following this example: https://devcenter.heroku.com/articles/ruby-websockets

The relevant code, which is in a custom middleware class:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []

    uri = URI.parse(ENV['REDISCLOUD_URL'])
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts @clients.count
          ### prints '0,' no clients receive msg
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
    ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
  
    ws.on :open do |event|
      @clients << ws
      puts @clients.count
      ### prints actual number of clients
    end

    ws.on :message do |event|
      $redis.publish(CHANNEL, event.data)
    end

    ws.on :close do |event|
      @clients.delete(ws)
      ws = nil
    end

    ws.rack_response
  else
    @app.call(env)
  end
end
end

Is the @clients array empty when accessed inside the new thread because instance variables aren't shared across threads? and if so, how do I share a variable across threads?

I have also tried using $clients (global variable, should be accessible across threads), to no avail.


Solution

  • UPDATED EDIT AT END: Shows working code. Main module unmodified except for debugging code. Note: I did experience the issue I already noted regarding the need to unsubscribe prior to termination.

    The code looks correct. I'd like to see how you are instantiating it.

    In config/application.rb, you probably have at least something like:

    require 'ws_communication'
    config.middleware.use WsCommunication
    

    Then, in your JavaScript client, you should have something like this:

    var ws = new WebSocket(uri);
    

    Do you instantiate another instance of WsCommunication? That would set @clients to an empty array and could exhibit your symptoms. Something like this would be incorrect:

    var ws = new WsCommunication;
    

    It would help us if you would show the client and, perhaps, config/application.rb if this post does not help.

    By the way, I agree with the comment that @clients should be protected by a mutex on any update, if not reads as well. It's a dynamic structure that could change at any time in an event-driven system. redis-mutex is a good option. (Hope that link is correct as Github seems to be throwing 500 errors on everything at the moment.)

    You might also note that $redis.publish returns an integer value of the number of clients that received the message.

    Finally, you might find that you need to ensure that your channel is unsubscribed before termination. I've had situations where I've ended up sending each message multiple, even many, times because of earlier subscriptions to the same channel that weren't cleaned up. Since you are subscribing to the channel within a thread, you will need to unsubscribe within that same thread or the process will just "hang" waiting for the right thread to magically appear. I handle that situation by setting an "unsubscribe" flag and then sending a message. Then, within the on.message block, I test for the unsubscribe flag and issue the unsubscribe there.

    The module you provided, with only minor debugging modifications:

    require 'faye/websocket'
    require 'redis'
    
    class WsCommunication
      KEEPALIVE_TIME = 15 #seconds
      CHANNEL = 'vip-deck'
    
      def initialize(app)
        @app = app
        @clients = []
        uri = URI.parse(ENV['REDISCLOUD_URL'])
        $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
        Thread.new do
          redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
          redis_sub.subscribe(CHANNEL) do |on|
            on.message do |channel, msg|
              puts "Message event. Clients receiving:#{@clients.count};"
              @clients.each { |ws| ws.send(msg) }
            end
          end
        end
      end
    
      def call(env)
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
    
          ws.on :open do |event|
            @clients << ws
            puts "Open event. Clients open:#{@clients.count};"
          end
    
          ws.on :message do |event|
            receivers = $redis.publish(CHANNEL, event.data)
            puts "Message published:#{event.data}; Receivers:#{receivers};"
          end
    
          ws.on :close do |event|
            @clients.delete(ws)
            puts "Close event. Clients open:#{@clients.count};"
            ws = nil
          end
    
          ws.rack_response
        else
          @app.call(env)
        end
      end
    end
    

    The test subscriber code I provided:

    # encoding: UTF-8
    puts "Starting client-subscriber.rb"
    $:.unshift File.expand_path '../lib', File.dirname(__FILE__)
    require 'rubygems'
    require 'eventmachine'
    require 'websocket-client-simple'
    
    puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
    
    url = ARGV.shift || 'ws://localhost:3000'
    
    EM.run do
    
      ws = WebSocket::Client::Simple.connect url
    
      ws.on :message do |msg|
        puts msg
      end
    
      ws.on :open do
        puts "-- Subscriber open (#{ws.url})"
      end
    
      ws.on :close do |e|
        puts "-- Subscriber close (#{e.inspect})"
        exit 1
      end
    
      ws.on :error do |e|
        puts "-- Subscriber error (#{e.inspect})"
      end
    
    end
    

    The test publisher code I provided. Publisher and Subscriber could easily be combined, as these are just tests:

    # encoding: UTF-8
    puts "Starting client-publisher.rb"
    $:.unshift File.expand_path '../lib', File.dirname(__FILE__)
    require 'rubygems'
    require 'eventmachine'
    require 'json'
    require 'websocket-client-simple'
    
    puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
    
    url = ARGV.shift || 'ws://localhost:3000'
    
    EM.run do
      count ||= 0
      timer = EventMachine.add_periodic_timer(5+rand(5)) do
        count += 1
        send({"MESSAGE": "COUNT:#{count};"})
      end
    
      @ws = WebSocket::Client::Simple.connect url
    
      @ws.on :message do |msg|
        puts msg
      end
    
      @ws.on :open do
        puts "-- Publisher open"
      end
    
      @ws.on :close do |e|
        puts "-- Publisher close (#{e.inspect})"
        exit 1
      end
    
      @ws.on :error do |e|
        puts "-- Publisher error (#{e.inspect})"
        @ws.close
      end
    
      def self.send message
        payload = message.is_a?(Hash) ? message : {payload: message}
        @ws.send(payload.to_json)
      end
    end
    

    A sample config.ru which runs all this at the rack middleware layer:

    require './controllers/main'
    require './middlewares/ws_communication'
    use WsCommunication
    run Main.new
    

    This is Main. I stripped it down out of my running version so it might need tweaked if you use it:

    %w(rubygems bundler sinatra/base json erb).each { |m| require m }
    ENV['RACK_ENV'] ||= 'development'
    Bundler.require
    $: << File.expand_path('../', __FILE__)
    $: << File.expand_path('../lib', __FILE__)
    
    Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    
      class Main < Sinatra::Base
    
        env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
        get "/" do
          erb :"index.html"
        end
    
        get "/assets/js/application.js" do
          content_type :js
          @scheme = env == "production" ? "wss://" : "ws://"
          erb :"application.js"
        end
      end