javascriptreactive-programmingrxjsreactive-extensions-js

Merge implemented as flatMap


Theoretically it should be possible to implement any RxJS operator (except just() and flatMap()) through flatMap(). For instance map() can be implemented as

function map(source, selector) {
  return source.flatMap(x => Rx.Observable.just(selector(x)));
}

How to implement merge() through flatMap()? (avoiding mergeAll() too, of course)


Solution

  • It looks possible if you take advantage of the fact that flatMap can also take array return values.

    Rx.Observable.prototype.merge = function(other) {
      var source = this;
      return Rx.Observable.just([source, other])
               //Flattens the array into observable of observables
               .flatMap(function(arr) { return arr; })
               //Flatten out the observables
               .flatMap(function(x) { return x; });
    }
    

    EDIT 1

    Using RxJS 6 and the pipe syntax

    import {of} from 'rxjs'
    import {flatMap} from 'rxjs/operators'
    
    function merge (other) {
      return source => of([source, other]).pipe(
               //Flattens the array into observable of observables
               flatMap(arr => arr)
               //Flatten out the observables
               flatMap(x => x)
             );
    }
    

    const {timestamp, map, flatMap, take} = rxjs.operators;
    const {interval, of: just} = rxjs;
    
    const source1 = interval(2000).pipe(
      timestamp(),
      map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
    )
    
    const source2 = interval(3000).pipe(
      timestamp(),
      map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
    )
    
    function mergeFromFlatMap (other) {
      return source => just([source, other]).pipe(
        flatMap(arr => arr),
        flatMap(seq => seq)
      )
    }
    
    source1.pipe(
      mergeFromFlatMap(source2),
      take(20)
    ).subscribe(console.log.bind(console));
    <script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>