I'm making use of the withLatestFrom
operator in RxJS in the normal way:
var combined = source1.withLatestFrom(source2, source3);
...to actively collect the most recent emission from source2
and source3
and to emit all three value only when source1
emits.
But I cannot guarantee that source2
or source3
will have produced values before source1
produces a value. Instead I need to wait until all three sources produce at least one value each before letting withLatestFrom
do its thing.
The contract needs to be: if source1
emits then combined
will always eventually emit when the other sources finally produce. If source1
emits multiple times while waiting for the other sources we can use the latest value and discard the previous values. Edit: as a marble diagram:
--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--
--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--
------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--
I can make a custom operator for this, but I want to make sure I'm not missing an obvious way to do this using the vanilla operators. It feels almost like I want combineLatest
for the initial emit and then to switch to withLatestFrom
from then on but I haven't been able to figure out how to do that.
Edit: Full code example from final solution:
var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');
var combined = source1.publish(function(s1) {
return source2.publish(function(s2) {
return source3.publish(function(s3) {
var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));
return Rx.Observable.merge(cL, wLF);
});
});
});
var sub1 = combined.subscribe(x => console.log('x', x));
// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');
// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]
// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]
Edit, edit: There seems to be a slight timing problem when handing control from wLF to cL. That may be a solvable problem but I changed the accepted answer instead to a simpler proposal. combineLatest has almost all the needed behavior except one, and that one is easier to mimic than trying to retrofit withLatestFrom or building a new custom operator:
Rx.Observable.prototype.waitLatestFrom = function (...args) {
const selectorFn = (typeof args.at(-1) === 'function')
? args.pop()
: (...args) => args;
return this
.map((value, index) => ({ value, index }))
.combineLatest(...args)
.distinctUntilChanged(([x]) => x.index)
.map(([x, ...xs]) => selectorFn(x.value, ...xs));
};
this simple solution works in my cases
import {
combineLatestWith,
distinctUntilChanged,
map,
ObservableInputTuple,
OperatorFunction,
pipe
} from "rxjs";
export function awaitLatestFrom<T, O extends unknown[]>(...inputs: [...ObservableInputTuple<O>]): OperatorFunction<T, [T, ...O]> {
return pipe(
map((value, index) => [value, index] as const),
combineLatestWith(...inputs),
distinctUntilChanged(([[, prevIndex]], [[, currentIndex]]) => prevIndex === currentIndex),
map(([[sourceValue], ...inputValues]) => [sourceValue, ...inputValues])
);
}