Theoretically it should be possible to implement any RxJS operator (except just()
and flatMap()
) through flatMap()
. For instance map()
can be implemented as
function map(source, selector) {
return source.flatMap(x => Rx.Observable.just(selector(x)));
}
How to implement merge()
through flatMap()
? (avoiding mergeAll()
too, of course)
It looks possible if you take advantage of the fact that flatMap can also take array return values.
Rx.Observable.prototype.merge = function(other) {
var source = this;
return Rx.Observable.just([source, other])
//Flattens the array into observable of observables
.flatMap(function(arr) { return arr; })
//Flatten out the observables
.flatMap(function(x) { return x; });
}
EDIT 1
Using RxJS 6 and the pipe
syntax
import {of} from 'rxjs'
import {flatMap} from 'rxjs/operators'
function merge (other) {
return source => of([source, other]).pipe(
//Flattens the array into observable of observables
flatMap(arr => arr)
//Flatten out the observables
flatMap(x => x)
);
}
const {timestamp, map, flatMap, take} = rxjs.operators;
const {interval, of: just} = rxjs;
const source1 = interval(2000).pipe(
timestamp(),
map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
)
const source2 = interval(3000).pipe(
timestamp(),
map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
)
function mergeFromFlatMap (other) {
return source => just([source, other]).pipe(
flatMap(arr => arr),
flatMap(seq => seq)
)
}
source1.pipe(
mergeFromFlatMap(source2),
take(20)
).subscribe(console.log.bind(console));
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>