angulartypescriptrxjscombinelatest

Combine two observables, result selector function, zip


I'm working on some legacy code and there is an issue with wrong status updates of the device. Problem is, that the old - legacy state update is done using long polling and it has concurrency issues on the BE which won't be fixed (BE deprecated). Sometimes it forgets to update the state. The messages arrive but it's incorrect.

I can retrieve the correct state by requesting it from the new BE (the app is a nasty hybrid right now) but I'm not able to do it correctly. My first idea was to use whitLatestFrom but it doesn't work for obvious reasons.

legacyService.getState<LegacyState>()
  .pipe(
    subscribeOn(queue),
    withLatestFrom(this.realService.getRealStatus()), // request to the new BE
    map(([legacy, real]) => {
      // This if statement is a must. getRealStatus() returns only partial state, not the whole DeviceState
      // If there is a difference in the state from two sources, I create a new object with correct state
      if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
        return new LegacyState(
          legacy.instrument, 
          legacy.db,
          DeviceState.UNSTABLE, 
        );
      }
      return event; // states are same, returning state from legacy source
    })
  )
  .subscribe(event => {
    this._deviceStateUpdated$.next(event);
  });

Works on app restart/reload but later the real state is not updated as no new call is made and it just returns previous value. Same happens with combineLatest. First one (from polling) is updated, second just returns previous value.

How can I combine two observables in such way, that when the first one is updated I force the update of the new value for the second observable also? And of course, I'm able to process both of them, as the second observable returns just a partial state. I tried multiple maps (swtichMap, concatMap, ...) but without success.


Solution

  • What I called a projection function is more accurately called a result selector function. See example 3 on this page. The basic structure I would use is this:

    import { interval } from 'rxjs';
    import { switchMap } from 'rxjs/operators';
    
    //whatever your long polling interval is
    const polling = interval(10000);
    
    //then use that to collect your states 
    const collectState = polling
        //pipe the interval event
       .pipe(
          //first you need the real state so you switchMap into that
          //I'm assuming this is Rx can treat as an observable, like a promise
          switchMap(this.realService.getRealStatus()),
          //then you have your real status and you need to use a result selector function
          switchMap(() => 
             legacyService.getState(),
             //at this point in the result selector function you have access to both states
             (real, legacy) => {
                 //so you can apply your logic
                 if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
                     return new LegacyState(
                        legacy.instrument, 
                        legacy.db,
                        DeviceState.UNSTABLE
                    );
                 } else { return legacy }
          })
        );
      
    

    I realise that I have unhelpfully switched the real / legacy order around but you get the gist.

    Another way of doing this would be to create an interval observable and a zipped observable which only emits when the both real and legacy states have emitted

    import { zip, interval } from 'rxjs';
    import { switchMap, map } from 'rxjs/operators';
    
    const polling = interval(10000);
    const states = zip(
        legacyService.getState(),
        this.realService.getRealStatus()
    );
    const collectState = polling
        .pipe(
            switchMap(states),
            map(statesArray => {
               //do the work here
            })
        );
    

    );

    Hope something here helps