ruby-on-railsrubyactioncable

Why doesn't this ActionCable broadcast work in test?


I'm trying to improve the tests for my ActionCable system but I'm having a hard time. In test, it seems like my stream_from("something") { ... } handlers aren't actually called. I was trying to debug and I made a minimal test case in a new Rails 7.2.1 app.

First, I made an empty ActionCable channel:

# app/channels/test_channel.rb
class TestChannel < ApplicationCable::Channel
end

Then, I wrote a test just to see if I could make it work:

require "test_helper"

class TestChannelTest < ActionCable::Channel::TestCase
  test "it works at all" do
    subscribe

    assert subscription.confirmed?

    last_message = nil
    subscription.stream_from("something") do |message|
      last_message = message
    end

    ActionCable.server.broadcast("something", 100)

    assert_equal(100, last_message)
  end
end

But this test fails:

# Running:

F

Failure:
TestChannelTest#test_it_works_at_all [test/channels/test_channel_test.rb:14]:
Expected: 100
  Actual: nil

As far as I can tell, the stream_from block is never called at all.

What am I missing? How can I make this test pass?


Solution

  • As pointed out by engineermsky in a comment, Rails's provided ActionCable test helpers stub the actual broadcasting of data. In order to run that in test, I had to copy some setup from other parts of the Rails test suite.

    Here's the full test, with lots of hacks, that actually ran broadcasts and provided a means to inspect them at the end of the test:

    # frozen_string_literal: true
    require "test_helper"
    
    class GraphqlChannelTest < ActionCable::Channel::TestCase
      module RealChannelStub
        def confirmed?
          subscription_confirmation_sent?
        end
    
        def real_streams
          streams
        end
      end
    
      def assert_has_real_stream(stream_name)
        assert subscription.real_streams.key?(stream_name), "Expected Stream #{stream_name.inspect} to be present in #{subscription.real_streams.keys}"
      end
    
      def setup
        @prev_server = ActionCable.server
        @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
        @server.config.allowed_request_origins = [ 'http://rubyonrails.com' ]
    
        ActionCable.instance_variable_set(:@server, @server)
      end
    
      def teardown
        ActionCable.instance_variable_set(:@server, @prev_server)
      end
    
      def wait_for_async
        wait_for_executor Concurrent.global_io_executor
      end
    
      def run_in_eventmachine
        yield
        wait_for_async
      end
    
      def wait_for_executor(executor)
        # do not wait forever, wait 2s
        timeout = 2
        until executor.completed_task_count == executor.scheduled_task_count
          sleep 0.1
          timeout -= 0.1
          raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0
        end
      end
    
      class Connection < ActionCable::Connection::Base
        attr_reader :websocket
    
        def send_async(method, *args)
          send method, *args
        end
    
        public :handle_close
      end
    
    
      module InterceptTransmit
        def transmit(msg)
          intercepted_messages << JSON.parse(msg)
          super
        end
    
        def intercepted_messages
          @intercepted_messages ||= []
        end
      end
    
      test "it subscribes and unsubscribes" do
        run_in_eventmachine do
          env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", "HTTP_ORIGIN" => "http://rubyonrails.com"
    
          @connection = Connection.new(ActionCable.server, env).tap do |connection|
            connection.process
            assert_predicate connection.websocket, :possible?
    
            wait_for_async
            assert_predicate connection.websocket, :alive?
            connection.websocket.singleton_class.prepend(InterceptTransmit)
          end
    
          @connection.subscriptions.add({"identifier" => "{\"channel\": \"GraphqlChannel\"}"})
    
          @subscription = @connection.subscriptions.instance_variable_get(:@subscriptions).values.first
          @subscription.singleton_class.prepend(RealChannelStub)
          assert subscription.confirmed?
    
          subscription.execute({
            "query" => "subscription { payload(id: \"abc\") { value } }"
          })
          wait_for_async
    
          sub_id = subscription.instance_variable_get(:@subscription_ids).first
          subscription_stream = "graphql-subscription:#{sub_id}"
          assert_has_real_stream subscription_stream
          topic_stream = "graphql-event::payload:id:abc"
          assert_has_real_stream topic_stream
    
          subscription.make_trigger({ "field" => "payload", "arguments" => { "id" => "abc"}, "value" =>  19 })
    
          wait_for_async
    
          @connection.handle_close
          wait_for_async
    
          expected_data = [
            {"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "type"=>"confirm_subscription"},
            {"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "message"=>{"result"=>{"data"=>{}}, "more"=>true}},
            {"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "message"=>{"result"=>{"data"=>{"payload"=>{"value"=>19}}}, "more"=>true}},
            {"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "message"=>{"more"=>false}},
          ]
          assert_equal expected_data, @connection.websocket.intercepted_messages
        end
      end
    
      class TestServer
        include ActionCable::Server::Connections
        include ActionCable::Server::Broadcasting
    
        attr_reader :logger, :config, :mutex
    
        class FakeConfiguration < ActionCable::Server::Configuration
          attr_accessor :subscription_adapter, :log_tags, :filter_parameters
    
          def initialize(subscription_adapter:)
            @log_tags = []
            @filter_parameters = []
            @subscription_adapter = subscription_adapter
          end
    
          def pubsub_adapter
            @subscription_adapter
          end
        end
    
        def initialize(subscription_adapter: SuccessAdapter)
          @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
          @config = FakeConfiguration.new(subscription_adapter: subscription_adapter)
          @mutex = Monitor.new
        end
    
        def pubsub
          @pubsub ||= @config.subscription_adapter.new(self)
        end
    
        def event_loop
          @event_loop ||= ActionCable::Connection::StreamEventLoop.new.tap do |loop|
            loop.instance_variable_set(:@executor, Concurrent.global_io_executor)
          end
        end
    
        def worker_pool
          @worker_pool ||= ActionCable::Server::Worker.new(max_size: 5)
        end
      end
    end