I have an Observable where each new value should cause an HTTP request. On the client-side I only care about the latest response value; however, I want every request to complete for monitoring/etc. purposes.
What I currently have is something like:
function simulate(x) {
// Simulate an HTTP request.
return of(x).pipe(delay(6));
}
source$.pipe(
someMapFunc(x => simulate(x)),
);
When I use switchMap
for the someMapFunc
, I get the right set of responses (only the latest). However, if the request is taking too long, it will get canceled.
When I use mergeMap
instead, I get the right set of requests (every request completes), but I get the wrong set of responses (every single one).
Is there a way to get the requests of mergeMap
with the responses of switchMap
? I know I can write this as a custom operator, but I'm wondering if I can build this out of existing/standard rxjs operators. To summarize what I'm thinking of:
switchMap
that doesn't unsubscribe when it switches;mergeMap
that only emits values from the latest inner Observable.Edit: Based on the accepted answer, I was able to get the following, which works:
function orderedMergeMap(project) {
return (s) => defer(() => {
let recent = 0;
return s.pipe(
mergeMap((data, idx) => {
recent = idx;
return project(data).pipe(filter(() => idx === recent));
})
);
});
}
I'm not 100% sure if this is what you're after, and I haven't fully tested this, but I created a custom operator that might do something close to what you're after. Maybe you can tinker with it a bit more.
This is a mergeMap that filters out "old" values. Old values are emissions from sources that happen after a newer source starts to emit.
function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
return s => defer(() => {
let recent = 0;
return s.pipe(
map((v, i) => ({order: i, payload: v})),
mergeMap(({order, payload}) => project(payload).pipe(
map(v => ({order, payload: v}))
)),
tap(({order}) => {
if(order > recent) recent = order;
}),
filter(({order}) => order < recent),
map(({payload}) => payload)
);
});
}
The version OP settled on:
function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
return s => defer(() => {
let recent = 0;
return s.pipe(
mergeMap((data, idx) => {
recent = idx;
return project(data).pipe(
filter(() => idx === recent)
);
})
);
});
}