javascriptbufferrxjsobservablesubject

Buffering multiple subscribers to rx.js observable


I have

  var subject = new rx.Subject();
  var stream =     rx.Observable.fromEvent(blah, 'event')
                  .filter(blah)
                  .map(blah)
                  .subscribe(subject);

                return subject;

Then I pass subject to several different handlers which are going to process the event in different ways and at different speeds. So what I have in each handler is:

subject.subscribe(async function (x) {
        const func = self[x.eventName];
        if (func) {
          await eventHandlerWrapper(self.handlerName, func, x);
        }
      })

I have two questions:


Solution

  • First of all, the creation of the subject can be simplified like this:

    const subject = rx.Observable.fromEvent(blah, 'event')
                  .filter(blah)
                  .map(blah)
                  .share();
    

    The share method will create a Subject from the stream. If you return this subject instance to every subscriber, you'll get the same behaviour and it looks better.

     a) if the events come in super fast is the handler going to process
     them synchronously and in the right order given the way I have it?
    

    Events are going to be pushed through the entire chain one by one and in the correct order. Meaning, an event that comes in through the 'fromEvent' will be pushed through the entire chain until the point you are subscribed to it, before handling the next value (unless there's an async operator in between :)). Ben Lesh explained this at angular connect 2015: https://www.youtube.com/watch?v=KOOT7BArVHQ (you can watch the whole talk but it's around min 17 where he compares arrays to observables).

    b) if the different handlers handle the event at different speeds are 
    they all going to wait till the slowest handler is through before the    
    next event is provided? or will they all sort of buffer and handle at  
    they're own pace?
    

    They will handle the events at their own pace. Check the following example:

    let interval$ = Rx.Observable.interval(1000).share();
    
    interval$.concatMap((val) => {
        console.log('called');
        return Rx.Observable.of(val).delay(3000)
      })
      .subscribe((val) => console.log("slow ", val));
    
    interval$.subscribe((val) => console.log("fast ", val));
    

    Here I use an interval observable that I convert into a subject. So it will send out an event every second. I have one subscription that is taking a value, handling this value (which takes 2seconds) and then taking the next one (with the concatMap). And another subscription that processes them immediately. If you run this code (jsbin here: https://jsbin.com/zekalab/edit?js,console), you'll see that they both handle the events at their own pace.

    So they do not wait for the slowest handler and it will be buffered internally.

    The situation you are describing could have potentially some dangerous situation if the slowest processor is slower than the frequency the events are thrown. In that case, your buffer would keep growing and eventually your application would crash. This is a concept called back pressure. You are getting events faster than you are processing them. In that case, you need to use operators like 'buffer' or 'window' on the slowest processors to avoid this situation.