ruby-on-railsrubycqrsevent-sourcingrails-event-store

Subscribe only to a single stream with RailsEventStore


I intend to project a persistent read-model into a relational database with RailsEventStore.

To make this possible I need an input stream with a fixed order and without duplicated events. Therefore I built a linker that listens to all events and links those events relevant for my projection into a separate stream.

Now I would like to register a builder for given read-model as a subscriber to that stream. But I can't find a way to subscribe to a specific stream.

Is this even possible? If so, how can it be done?


Solution

  • There is no way to subscribe for events from selected stream (however the idea seems interesting). You need to implement your builder to read from the given stream and process events periodically. It need to store last processed event id and next time it runs it should read only new domain events linked to stream.

    The (pseudo) code could look like:

    class Builder
      def initialize(event_store)
        @event_store = event_store
        @last_processed_event_id = read_last_processed_event
      end
    
      def call(stream)
        events = read_all_since(stream, event_id)
        process(events)
        store_last_processed_event
      end
    
      private
      def read_all_since(stream, id)
        @event_store.read.stream(stream).from(id).to_a
      end
    
      def process(events)
        events.each do |event|
          #... do sth here
          @last_processed_event_id = event.event_id
        end
      end
    
      def load_last_processed_event
        # read last processed event or return nil to start from beginning of stream
      end
    
      def store_last_processed_event
        # store it somewhere
      end
    end