angularrxjsobservablebehaviorsubjectsubject

How to combine Subjects and Observables with Angular rxjs


I have the following two subscriptions:

this.service1.source1.subscribe(countries => {
  this.filteredData =  [];
  countries.forEach(country => {
    this.filteredData = this.data.filter(user => {
      return user.country == country;
    });
  });
});

this.service2.source2.subscribe(companies => {
  companies.forEach(company => {
    this.filteredData =  [];
    this.filteredData = this.data.filter(user => {
      return user.company == company;
    });
  });
});

where source1 and source2 are BehaviorSubject and both events are being caught by subscribe.

I want to combine both subscriptions because I want to filter data depending on both results returned by subscribe

I tried forkJoin and zip

zip([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('zip results:', results);
});

forkJoin([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('forkJoin results:', results);
});

But with those (zip & forkJoin) I noticed I am not even getting anything on the console, as if subscribe is not getting executed when events are emitted.

How can I combine the results of both sources into one subscription?


Solution

  • forkJoin may not be suitable for your use case because it emits the last emitted value from each, when all observables COMPLETE.

    zip also may not give you the desired behavior because It waits for all input streams to have all emitted their n-th value and as soon as this condition is met, it combines all those n-th values and emits the n-th combined value.

    so in either case you won't get an emission until there is an emission in both observables. since this.service1.source1 andthis.service1.source2 are BehaviorSubject using zip guarantees an initial emission. but later emissions will occur only when both observables emit.

    i suggest to use combineLatest beacuse whenever any input stream emits a value, it combines the latest values emitted by each input stream.

    combineLatest(this.service1.source1, this.service1.source2).subscribe(results => {
      console.log('combineLatest results:', results);
    });
    

    and since this.service1.source1 andthis.service1.source2 are BehaviorSubject such that:

    A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to.

    it is guaranteed that you get an emmission whenever you subscribe to it and whenever any of the observables emit a value.