rxjsconcatmap

Why concatMap is not working as expected?


please see my code below

import { Observable, interval } from 'rxjs';
import { map, take, mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

const frameworks = ['Backbone', 'Angular', 'React', 'Vue'];

const getRecruits = agency => new Observable(observer => {
  console.log('agency', agency);
  interval(1000).pipe(
    take(5)
  ).subscribe(val => observer.next(`${agency} Developer ${val}`));
});

// concatMap
interval(3000).pipe(
  take(4),
  map(val => frameworks[val]),
  concatMap(agency => getRecruits(agency))
).subscribe(val => console.log(val));

and this is my output:

enter image description here

I expected after Backbone finished, it would have continued with Angular, React and then Vue. But the execution stopped after Backbone

any explanations?


Solution

  • key knowledge

    conactMap only emits the next observable as far as the previous is completed.

    reason

    If you keep this in mind and have a look onto getRecruits you can see that the inner interval observable closes, but actually your newly create one new Observable(observer => ...) does not, because you only emit values to the observer via next. You can complete the returned observable the same time your your inner subscription fires complete.

    solution

    const getRecruits = agency => new Observable(observer => {
      interval(1000).pipe(
        take(5)
      ).subscribe(
        val => observer.next(`${agency} Developer ${val}`),
        () => void 0, // You can use void 0 for no emit
        () => observer.complete() // Your interval source completes here and you call the new Observalbe to be completed
      );
    });