I have a stream of predicates I want to send to an API to execute. I only need to identify the first four (for example) that return true.
Although I can send API requests in parallel, each API call is expensive: I want to minimize the number of unnecessary API calls, while still utilizing parallelism where possible.
The simplest solution I found uses concatMap
:
predicates$.pipe(
concatMap(callExpensiveAPI),
filter(isTruthy),
take(4)
).subscribe();
However, concatMap
only runs a single API request at a time, waiting for each to complete before starting another.
Replacing concatMap
with mergeMap
, parallelism is maximized to a fault. I could be firing off twenty API calls off the bat, when the first four predicates already return true.
The concurrency parameter for mergeMap
seems to get me very close to the desired behavior:
predicates$.pipe(
mergeMap(callExpensiveAPI, 4), // Limit to four concurrent API calls
filter(isTruthy),
take(4)
).subscribe();
However, there are still two shortcomings:
The order of predicates (and results) is not preserved. I'll only get four truthy predicates, but their order in the stream is dependent on each API call's response time.
A chance of "overrun" still exists. If the first three predicates are truthy, but the fourth truthful predicate is deep in the stream, the above pipeline will still be firing four API calls at once. There's a chance that, after initiating the fourth API call destined to return true, a few more calls (up to three!) will be made for subsequent predicates in the stream.
At best, those calls are simply unnecessary. At worst, a subsequent call will return true
before the actual "fourth truthful" predicate, and the end result will not actually be the first four truthy predicates.
Ideally, I would like an RxJS pipeline where the concurrency parameter "shrinks" for each approved item in the downstream pipeline. When only one more predicate is needed, the concurrency parameter would be 1
, causing mergeMap
to behave like concatMap
.
How might I implement such a pipeline?
I found a bottleneck in this approach, suppose your buffer window is 15
, now we have found 14 correct values, now the buffer window will be 1, but the element is 14 elements away. This will cause the sequential execution 14 times, before the correct one is found. So I would advise, the approach in your question is more efficient for larger buffer windows, but not for smaller buffer windows, where you could try this below approach.
This is my best attempt at your problem, we use a BehaviorSubject
to initiate each emit of the buffer. Storing the buffer count as a local variable.
We store this buffer count in the variable concurrencyCount
.
let i = 0;
let concurrencyCount = 4;
const subject = new BehaviorSubject([]);
const predicates$ = subject.pipe(
startWith(null),
switchMap(() =>
interval(0).pipe(
take(concurrencyCount),
map(() => {
i++;
return i;
}),
toArray()
)
)
);
This will call the expensive API.
const callExpensiveAPI = (val: any) => {
console.log('inside merge map', val, ' - ', val % 4 == 0);
return from(of(val % 4 == 0)).pipe(delay(1000));
};
predicates$
.pipe(
// ensure each buffer happens sequentially.
concatMap((items: any) => {
return from(items).pipe(
// expensive api
mergeMap(callExpensiveAPI, Infinity),
// filter out negative values
filter(isTruthy),
// ensure parallel processing is completed before evaluating
toArray(),
map((items: any) => {
// minus the count from the concurrencyCount
concurrencyCount -= items.length;
return items;
})
);
}),
We pipe the observable predicates$
and then convert it to stream using from
. Which we use mergeMap
for parallel execution (the buffer determines the parallel processing).
We use toArray
to ensure the parallel processing is completed before evaluating.
We use filter
which you already have to filter out the false values and use map
to subtract the positive values from the concurrencyCount
variable.
The BehaviorSubject
will store the correct values as a state, which we can use later.
Finally we use takeWhile
to take only when the concurrencyCount
is greater than zero.
tap((values: any) => subject.next([...subject.value, ...values])),
takeWhile(() => concurrencyCount > 0)
At the subscribe end, on next block, we wait for one batch to complete and the concurrencyCount
to be set, then we reemit the next batch with the new concurrencyCount
.
The complete block will finally output the filtered values. Which is nothing but the behaviour subject containing the stored values.
complete: () => {
console.log('complete', subject.value);
},
import './style.css';
import {
of,
map,
mergeMap,
interval,
filter,
take,
from,
delay,
tap,
concatMap,
toArray,
takeWhile,
startWith,
switchMap,
BehaviorSubject,
} from 'rxjs';
let i = 0;
let concurrencyCount = 4;
const subject = new BehaviorSubject([]);
const predicates$ = subject.pipe(
startWith(null),
switchMap(() =>
interval(0).pipe(
take(concurrencyCount),
map(() => {
i++;
return i;
}),
toArray()
)
)
);
const callExpensiveAPI = (val: any) => {
console.log('inside merge map', val, ' - ', val % 4 == 0);
return from(of(val % 4 == 0)).pipe(delay(1000));
};
const isTruthy = (val: any) => !!val;
predicates$
.pipe(
// ensure each buffer happens sequentially.
concatMap((items: any) => {
return from(items).pipe(
// expensive api
mergeMap(callExpensiveAPI, Infinity),
// filter out negative values
filter(isTruthy),
// ensure parallel processing is completed before evaluating
toArray(),
map((items: any) => {
// minus the count from the concurrencyCount
concurrencyCount -= items.length;
return items;
})
);
}),
tap((values: any) => subject.next([...subject.value, ...values])),
takeWhile(() => concurrencyCount > 0)
)
.subscribe({
next: (values: any) => {
console.log('next', subject.value, values);
},
complete: () => {
console.log('complete', subject.value);
},
});