websocketrspechanami

How to test WebSockets For Hanami?


Using the following:

I've been able to add WebSockets to Hanami, however as this is for production code I want to add specs; but I can't find information on how to test WebSockets and Hanami using Rspec.

I've been able to find this for RoR but nothing non-Rails specific or Hanami Specific, I have asked on the Hanami Gitter but not gotten a response yet.

Is the TCR gem the only way? I would prefer something simpler but If I must how would I set it up for anycable-go via litecable.

How can I test WebSockets for Hanami using Rspec?


Solution

  • To get this working requires several moving parts, the first is the Socket simulator which simulates the receiving socket on the webserver:

    Note: url_path should be customized to what works for your web socket specific endpoint

    # frozen_string_literal: true
    
    require 'puma'
    require 'lite_cable/server'
    require_relative 'sync_client'
    
    class SocketSimulator
      def initialize(x_site_id_header: nil)
        @server_logs = []
        @x_site_id_header = x_site_id_header
      end
    
      attr_accessor :server_logs
    
      def client
        return @client if @client
    
        url_path = "/ws?connection_token=#{connection_token}"
    
        @client = SyncClient.new("ws://127.0.0.1:3099#{url_path}", headers: headers, cookies: '')
      end
    
      def connection_token
        @connection_token ||= SecureRandom.hex
      end
    
      def user
        return @user if @user
    
        email = "#{SecureRandom.hex}@mailinator.com"
        password = SecureRandom.hex
    
        @user = Fabricate.create :user, email: email, site_id: site_id, password: password
      end
    
      def start
        @server = Puma::Server.new(
          LiteCable::Server::Middleware.new(nil, connection_class: Api::Sockets::Connection),
          Puma::Events.strings
        ).tap do |server|
          server.add_tcp_listener '127.0.0.1', 3099
          server.min_threads = 1
          server.max_threads = 4
        end
    
        @server_thread = Thread.new { @server.run.join }
      end
    
      def teardown
        @server&.stop(true)
        @server_thread&.join
        @server_logs.clear
      end
    
      def headers
        {
          'AUTHORIZATION' => "Bearer #{jwt}",
          'X_HANAMI_DIRECT_BOOKINGS_SITE_ID' => @x_site_id_header || site_id
        }
      end
    
      def site_id
        @site_id ||= SecureRandom.hex
      end
    
      def jwt
        @jwt ||= Interactors::Users::GenerateJwt.new(user, site_id).call.jwt
      end
    end
    

    The next thing is the SyncClient which is a fake client you can use to actually connect to the simulated socket:

    # frozen_string_literal: true
    
    # Synchronous websocket client
    # Copied and modified from https://github.com/palkan/litecable/blob/master/spec/support/sync_client.rb
    class SyncClient
      require 'websocket-client-simple'
      require 'concurrent'
      require 'socket'
    
      WAIT_WHEN_EXPECTING_EVENT = 5
      WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
    
      attr_reader :pings
    
      def initialize(url, headers: {}, cookies: '')
        @messages = Queue.new
        @closed = Concurrent::Event.new
        @has_messages = Concurrent::Semaphore.new(0)
        @pings = Concurrent::AtomicFixnum.new(0)
    
        @open = Concurrent::Promise.new
    
        @ws = set_up_web_socket(url, headers.merge('COOKIE' => cookies))
    
        @open.wait!(WAIT_WHEN_EXPECTING_EVENT)
      end
    
      def ip
        Socket.ip_address_list.detect(&:ipv4_private?).try(:ip_address)
      end
    
      def set_up_web_socket(url, headers)
        WebSocket::Client::Simple.connect(
          url,
          headers: headers
        ) do |ws|
          ws.on(:error, &method(:on_error))
    
          ws.on(:open, &method(:on_open))
    
          ws.on(:message, &method(:on_message))
    
          ws.on(:close, &method(:on_close))
        end
      end
    
      def on_error(event)
        event = RuntimeError.new(event.message) unless event.is_a?(Exception)
    
        if @open.pending?
          @open.fail(event)
        else
          @messages << event
          @has_messages.release
        end
      end
    
      def on_open(_event = nil)
        @open.set(true)
      end
    
      def on_message(event)
        if event.type == :close
          @closed.set
        else
          message = JSON.parse(event.data)
          if message['type'] == 'ping'
            @pings.increment
          else
            @messages << message
            @has_messages.release
          end
        end
      end
    
      def on_close(_event = nil)
        @closed.set
      end
    
      def read_message
        @has_messages.try_acquire(1, WAIT_WHEN_EXPECTING_EVENT)
    
        msg = @messages.pop(true)
        raise msg if msg.is_a?(Exception)
    
        msg
      end
    
      def read_messages(expected_size = 0)
        list = []
        loop do
          list_is_smaller = list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT
          break unless @has_messages.try_acquire(1, list_is_smaller)
    
          msg = @messages.pop(true)
          raise msg if msg.is_a?(Exception)
    
          list << msg
        end
        list
      end
    
      def send_message(message)
        @ws.send(JSON.generate(message))
      end
    
      def close
        sleep WAIT_WHEN_NOT_EXPECTING_EVENT
    
        raise "#{@messages.size} messages unprocessed" unless @messages.empty?
    
        @ws.close
        wait_for_close
      end
    
      def wait_for_close
        @closed.wait(WAIT_WHEN_EXPECTING_EVENT)
      end
    
      def closed?
        @closed.set?
      end
    end
    

    The last part is a fake channel to test against:

    # frozen_string_literal: true
    
    class FakeChannel < Api::Sockets::ApplicationChannel
      identifier :fake
    
      def subscribed
        logger.info "Can Reject? #{can_reject?}"
        reject if can_reject?
    
        logger.debug "Streaming from #{stream_location}"
        stream_from stream_location
      end
    
      def unsubscribed
        transmit message: 'Goodbye channel!'
      end
    
      def can_reject?
        logger.info "PARAMS: #{params}"
        params.fetch('value_to_check', 0) > 5
      end
    
      def foo
        transmit('bar')
      end
    end
    

    To use in specs:

    # frozen_string_literal: true
    
    require_relative '../../../websockets-test-utils/fake_channel'
    require_relative '../../../websockets-test-utils/socket_simulator'
    
    RSpec.describe Interactors::Channels::Broadcast, db_truncation: true do
      subject(:interactor) { described_class.new(token: connection_token, loc: 'fake', message: message) }
    
      let(:identifier) { { channel: 'fake' }.to_json }
    
      let(:socket_simulator) { SocketSimulator.new }
      let(:client) { socket_simulator.client }
      let(:user) { socket_simulator.user }
      let(:connection_token) { socket_simulator.connection_token }
      let(:channel) { 'fake' }
      let(:message) { 'woooooo' }
    
      before do
        socket_simulator.start
      end
    
      after do
        socket_simulator.teardown
      end
    
      describe 'call' do
        before do
          client.send_message command: 'subscribe',
                              identifier: identifier
        end
    
        it 'broadcasts a message to the correct channel' do
          expect(client.read_message).to eq('type' => 'welcome')
          expect(client.read_message).to eq(
            'identifier' => identifier,
            'type' => 'confirm_subscription'
          )
          interactor.call
          expect(client.read_message).to eq(
            'identifier' => identifier,
            'message' => message
          )
        end
    
        context 'with other connection' do
          let(:user2) { Fabricate.create :user }
          let(:jwt) { Interactors::Users::GenerateJwt.new(user2, site_id).call.jwt }
          let(:site_id) { socket_simulator.site_id }
          let(:url_path) { "/ws?connection_token=#{SecureRandom.hex}" }
          let(:client2) { SyncClient.new("ws://127.0.0.1:3099#{url_path}", headers: {}, cookies: '') }
    
          before do
            client2.send_message command: 'subscribe',
                                 identifier: identifier
          end
    
          it "doesn't broadcast to connections that shouldn't get it" do
            aggregate_failures 'broadcast!' do
              expect(client2.read_message).to eq('type' => 'welcome')
              expect(client2.read_message).to eq(
                'identifier' => identifier,
                'type' => 'confirm_subscription'
              )
              expect(client.read_message).to eq('type' => 'welcome')
              expect(client.read_message).to eq(
                'identifier' => identifier,
                'type' => 'confirm_subscription'
              )
              interactor.call
              sleep 1
    
              expect(client.read_message).to eq(
                'identifier' => identifier,
                'message' => message
              )
              expect { client2.close }.not_to raise_exception
            end
          end
        end
      end
    end