typescriptrxjsswitchmapmergemap

switchMap combined with mergeMap


I have an Observable where each new value should cause an HTTP request. On the client-side I only care about the latest response value; however, I want every request to complete for monitoring/etc. purposes.

What I currently have is something like:

function simulate(x) {
  // Simulate an HTTP request.
  return of(x).pipe(delay(6));
}

source$.pipe(
  someMapFunc(x => simulate(x)),
);

When I use switchMap for the someMapFunc, I get the right set of responses (only the latest). However, if the request is taking too long, it will get canceled.

When I use mergeMap instead, I get the right set of requests (every request completes), but I get the wrong set of responses (every single one).

marble diagram of code above

Is there a way to get the requests of mergeMap with the responses of switchMap? I know I can write this as a custom operator, but I'm wondering if I can build this out of existing/standard rxjs operators. To summarize what I'm thinking of:

Edit: Based on the accepted answer, I was able to get the following, which works:

function orderedMergeMap(project) {
  return (s) => defer(() => {
    let recent = 0;
    return s.pipe(
      mergeMap((data, idx) => {
        recent = idx;
        return project(data).pipe(filter(() => idx === recent));
      })
    );
  });
}

Solution

  • I'm not 100% sure if this is what you're after, and I haven't fully tested this, but I created a custom operator that might do something close to what you're after. Maybe you can tinker with it a bit more.

    This is a mergeMap that filters out "old" values. Old values are emissions from sources that happen after a newer source starts to emit.

    function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
      return s => defer(() => {
        let recent = 0;
        return s.pipe(
          map((v, i) => ({order: i, payload: v})),
          mergeMap(({order, payload}) => project(payload).pipe(
            map(v => ({order, payload: v}))
          )),
          tap(({order}) => {
            if(order > recent) recent = order;
          }),
          filter(({order}) => order < recent),
          map(({payload}) => payload)
        );
      });
    }
    

    The version OP settled on:

    function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
      return s => defer(() => { 
        let recent = 0; 
        return s.pipe( 
          mergeMap((data, idx) => { 
            recent = idx; 
            return project(data).pipe(
              filter(() => idx === recent)
            ); 
          }) 
        ); 
      }); 
    }