javascriptnode.jsobservablerxjsfrp

How to chain observables in rx.js


I have an observable that is pulling events from a server, filtering events for the application type, then subscribing and dispatching the event to one or more handlers to handle.

The handlers then go off and do some async update to the db, and I find that the observable will crank out events so fast that the updates are stepping on each other. Which I should have expected.

So I think I need my handlers to each employ its own observable to act as a queue which will handle one event and wait for an ack.

So my question is, how can I create an observable that receives messages continuously and dispatches one message at a time waiting for an acknowledgment before releasing the next message?

Also the observables need to be Cold I think, as I can not lose messages.


Solution

  • I think the operator concatMap does something close to what you are looking for. You can review a former answer here on SO to illustrate a similar use case for concatMap : RxJS queueing dependent tasks

    It is close but not exactly what you want as there is no waiting for an ACK signal to release the next value. Instead, concatMap use the completion signal of the currently 'executed' observable to subscribe to the next one. If your observable contains somewhere the execution of an update on a db then those updates will be executed in order. For instance:

    function handler (source$) {
      // source$ is your source of events from which you generate the update calls
      return source$.concatMap(function (event){
        return updateDB(event);
      })
    }
    
    function updateDB(event) {
      return Rx.Observable.create(function(observer){
        // do the update in the db
        // you probably have a success and error handler 
        // you plug the observer notification into those handlers
        if (success) {
          // if you need to pass down some value from the update
          observer.onNext(someValue);
          // In any case, signal completion to allow concatMap to move to next update
          observer.onCompleted();
        }
        if (error) {observer.onError(error);}
      })
    }
    

    This is a generic code to specialize to the library you are using. You might be able to use directly the operator fromNodeCallback, or fromCallback depending on the API of your database update function.

    All the same, be mindful that there might be some buffering involved to hold on to the next observable while the current one is being executed, and that buffer can only be finite, so if you do have significant differences in speed between producer and consumer, or memory limitation, you might want to handle things differently.

    Also, if you are using RxJS v5, onError becomes error, onComplete becomes complete, onNext becomes next (cf. new observer interface).

    Last comment, the lossy/lossless nature of your stream is a concept different to the hot vs. cold nature of the stream. You can have a look at illustrated subscription and data flows for both type of streams.