angulartypescriptrxjsreactive-programmingobservable

How can I subscribe to multiple observables in Angular2 at once and wait, if there is new data on each of them?


I have an Angular component that uses 3 services. Each of those services has an observer that I can subscribe to. The view of the component must be updated if anything in the observed services changes. That happens through websockets (feathers.js).

I want the doSomethingWithTheNewDataThatIsShownOnView() to be called only once on ngInit and I think I can do this with forkJoin:

private ngOnInit(): void {
    Observable.forkJoin(
        this.filesService.inputs$,
        this.filesService.outputs$,
        this.processesService.items$
    ).subscribe(
        data => {
          this.inputs = data[0];
          this.outputs = data[1];
          this.processes = data[2];
          this.doSomethingWithTheNewDataThatIsShownOnView();
          this.ref.markForCheck();
        },
        err => console.error(err)
    );

    this.filesService.find();
    this.processesService.find();
}

That works as expected, but if there is new data on inputs$ Observable or outputs$ Observable, the subscribe is not being called again. It is only called if all three Observables have new data. Is there something like "wait for an interval of 100ms if all three observables have new data. Otherwise use only the new data of all observables, that have new data until now"?

I hope you understand what I want to do.


Solution

  • combineLatest() should do the trick:

    private ngOnInit(): void {
        Observable.combineLatest(
            this.filesService.inputs$,
            this.filesService.outputs$,
            this.processesService.items$
        ).subscribe(
            data => {
              this.inputs = data[0];
              this.outputs = data[1];
              this.processes = data[2];
              this.doSomethingWithTheNewDataThatIsShownOnView();
              this.ref.markForCheck();
            },
            err => console.error(err)
        );
    
        this.filesService.find();
        this.processesService.find();
    }
    

    Additional hints on your code:

    As an alternative combineLatest() also takes an optional aggregation-method and also as a side-note: try to use pure functions only and avoid stateful components(this.inputs, this.outputs, ect..) and use the | async pipe in the template and try to avoid too much logic in the subscription and if possible try to avoid manual subscriptions at all, because those have to be manually unsubscribed but the async-pipe could handle that automatically for you:

    autoUpdatedViewData$ = Observable.combineLatest(
            this.filesService.inputs$,
            this.filesService.outputs$,
            this.processesService.items$,
            (inputs, outputs, items) => ({inputs, outputs, items})
        )
        .map(({inputs, outputs, items}) => {
            return this.doSomethingWithTheNewDataThatIsShownOnView(inputs, outputs, items);
        });
    
    private ngOnInit(): void {
        this.filesService.find();
        this.processesService.find();
    }
    
    // in your template
    <div>{{autoUpdatedViewData$ | async}}</div>