javascriptrxjs

How can I run a concurrent mergeMap in RxJS with a variable batch size?


I have a stream of predicates I want to send to an API to execute. I only need to identify the first four (for example) that return true.

Although I can send API requests in parallel, each API call is expensive: I want to minimize the number of unnecessary API calls, while still utilizing parallelism where possible.

The simplest solution I found uses concatMap:

predicates$.pipe(
    concatMap(callExpensiveAPI),
    filter(isTruthy),
    take(4)
).subscribe();

However, concatMap only runs a single API request at a time, waiting for each to complete before starting another.

Replacing concatMap with mergeMap, parallelism is maximized to a fault. I could be firing off twenty API calls off the bat, when the first four predicates already return true.

The concurrency parameter for mergeMap seems to get me very close to the desired behavior:

predicates$.pipe(
    mergeMap(callExpensiveAPI, 4), // Limit to four concurrent API calls
    filter(isTruthy),
    take(4)
).subscribe();

However, there are still two shortcomings:

  1. The order of predicates (and results) is not preserved. I'll only get four truthy predicates, but their order in the stream is dependent on each API call's response time.

  2. A chance of "overrun" still exists. If the first three predicates are truthy, but the fourth truthful predicate is deep in the stream, the above pipeline will still be firing four API calls at once. There's a chance that, after initiating the fourth API call destined to return true, a few more calls (up to three!) will be made for subsequent predicates in the stream.

    At best, those calls are simply unnecessary. At worst, a subsequent call will return true before the actual "fourth truthful" predicate, and the end result will not actually be the first four truthy predicates.


Ideally, I would like an RxJS pipeline where the concurrency parameter "shrinks" for each approved item in the downstream pipeline. When only one more predicate is needed, the concurrency parameter would be 1, causing mergeMap to behave like concatMap.

How might I implement such a pipeline?


Solution

  • I found a bottleneck in this approach, suppose your buffer window is 15, now we have found 14 correct values, now the buffer window will be 1, but the element is 14 elements away. This will cause the sequential execution 14 times, before the correct one is found. So I would advise, the approach in your question is more efficient for larger buffer windows, but not for smaller buffer windows, where you could try this below approach.


    This is my best attempt at your problem, we use a BehaviorSubject to initiate each emit of the buffer. Storing the buffer count as a local variable.

    We store this buffer count in the variable concurrencyCount.

    let i = 0;
    let concurrencyCount = 4;
    const subject = new BehaviorSubject([]);
    const predicates$ = subject.pipe(
      startWith(null),
      switchMap(() =>
        interval(0).pipe(
          take(concurrencyCount),
          map(() => {
            i++;
            return i;
          }),
          toArray()
        )
      )
    );
    

    This will call the expensive API.

    const callExpensiveAPI = (val: any) => {
      console.log('inside merge map', val, ' - ', val % 4 == 0);
      return from(of(val % 4 == 0)).pipe(delay(1000));
    };
    

    predicates$
      .pipe(
        // ensure each buffer happens sequentially.
        concatMap((items: any) => {
          return from(items).pipe(
            // expensive api
            mergeMap(callExpensiveAPI, Infinity),
            // filter out negative values
            filter(isTruthy),
            // ensure parallel processing is completed before evaluating
            toArray(),
            map((items: any) => {
              // minus the count from the concurrencyCount
              concurrencyCount -= items.length;
              return items;
            })
          );
        }),
    

    We pipe the observable predicates$ and then convert it to stream using from. Which we use mergeMap for parallel execution (the buffer determines the parallel processing).

    We use toArray to ensure the parallel processing is completed before evaluating.

    We use filter which you already have to filter out the false values and use map to subtract the positive values from the concurrencyCount variable.


    The BehaviorSubject will store the correct values as a state, which we can use later.

    Finally we use takeWhile to take only when the concurrencyCount is greater than zero.

    tap((values: any) => subject.next([...subject.value, ...values])),
    takeWhile(() => concurrencyCount > 0)
    

    At the subscribe end, on next block, we wait for one batch to complete and the concurrencyCount to be set, then we reemit the next batch with the new concurrencyCount.

    The complete block will finally output the filtered values. Which is nothing but the behaviour subject containing the stored values.

    complete: () => {
      console.log('complete', subject.value);
    },
    

    Full Code:

    import './style.css';
    
    import {
      of,
      map,
      mergeMap,
      interval,
      filter,
      take,
      from,
      delay,
      tap,
      concatMap,
      toArray,
      takeWhile,
      startWith,
      switchMap,
      BehaviorSubject,
    } from 'rxjs';
    let i = 0;
    let concurrencyCount = 4;
    const subject = new BehaviorSubject([]);
    const predicates$ = subject.pipe(
      startWith(null),
      switchMap(() =>
        interval(0).pipe(
          take(concurrencyCount),
          map(() => {
            i++;
            return i;
          }),
          toArray()
        )
      )
    );
    
    const callExpensiveAPI = (val: any) => {
      console.log('inside merge map', val, ' - ', val % 4 == 0);
      return from(of(val % 4 == 0)).pipe(delay(1000));
    };
    const isTruthy = (val: any) => !!val;
    predicates$
      .pipe(
        // ensure each buffer happens sequentially.
        concatMap((items: any) => {
          return from(items).pipe(
            // expensive api
            mergeMap(callExpensiveAPI, Infinity),
            // filter out negative values
            filter(isTruthy),
            // ensure parallel processing is completed before evaluating
            toArray(),
            map((items: any) => {
              // minus the count from the concurrencyCount
              concurrencyCount -= items.length;
              return items;
            })
          );
        }),
        tap((values: any) => subject.next([...subject.value, ...values])),
        takeWhile(() => concurrencyCount > 0)
      )
      .subscribe({
        next: (values: any) => {
          console.log('next', subject.value, values);
        },
        complete: () => {
          console.log('complete', subject.value);
        },
      });
    

    Stackblitz Demo