typescriptrxjs

Signal that an Observable with shareReplay is "stale"


I have a situation with shareReplay(1) . Note that "recalculating" result$ is expensive so I don't want to do it unless required hence I'm using shareReplay(1) in the first place.

refreshSubject = new BehaviorSubject<void>(undefined);

// Assume this has many subscriptions.
obs$ = refreshSubject.pipe(
  switchMap(() => timer(1000)), // My actual logic here is more complicated but the important part is that there's some time delay before result$ emits)
  switchMap(() => result$),
  shareReplay(1),
);

Now if I call refreshSubject.next() (to "reset" the value of obs$ after some logic that would change the value of result$) and want to immediately use a new/"up to date" value of obs$ like so

refreshSubject.next();

obs$.pipe(take(1)).subscribe(value => {
  ...
});

The subscription seems to receive the old value, presumably because result$ hasn't emitted yet (due to the time delay) so shareReplay(1) hasn't yet "realised" that its value is "stale".

Is there an elegant way to "tell" the shareReplay Observable that its stored value is no longer relevant (so any subscriptions after the refresh will use the next emitted value whether it occurs before/after they subscribe)?


Solution

  • I ended up adapting this answer https://stackoverflow.com/a/28945444/22825680 to create a "refreshable" replay subject (which is passed refreshSubject in my example) then an operator that utilises it to replace shareReplay(1) in these instances

    class CacheSubject<T> implements SubjectLike<T>
    {
      // Adapted from https://stackoverflow.com/a/28945444/22825680
    
      private readonly mySubjects!: ReplaySubject<Observable<T>>;
      private readonly myConcatenatedSubjects!: Observable<T>;
      private myCurrentSubject!: ReplaySubject<T>;
    
      constructor(resetSignal$?: Observable<void>)
      {
        this.mySubjects = new ReplaySubject<Observable<T>>(1);
        this.myConcatenatedSubjects = this.mySubjects.pipe(
          concatAll(),
        );
        this.myCurrentSubject = new ReplaySubject<T>();
        this.mySubjects.next(this.myCurrentSubject);
    
        if (resetSignal$ != null)
        {
          resetSignal$.subscribe({
            next: () =>
            {
              this.reset();
            },
          });
        }
      }
    
      public reset(): void
      {
        this.myCurrentSubject.complete();
        this.myCurrentSubject = new ReplaySubject<T>();
        this.mySubjects.next(this.myCurrentSubject);
      }
    
      public next(value: T): void
      {
        this.myCurrentSubject.next(value);
      }
    
      public error(err: any): void
      {
        this.myCurrentSubject.error(err);
      }
    
      public complete()
      {
        this.myCurrentSubject.complete();
        this.mySubjects.complete();
    
        // Make current subject unreachable.
        this.myCurrentSubject = new ReplaySubject<T>();
      }
    
      public subscribe(observer: Observer<T>): Subscription
      {
        return this.myConcatenatedSubjects.subscribe(observer);
      }
    }
    
    function cache<T>(
      resetSignal$?: Observable<void>,
    ): MonoTypeOperatorFunction<T>
    {
      return share<T>({
        connector: () =>
        {
          return resetSignal$ == null
            ? new ReplaySubject<T>(1)
            : new CacheSubject(resetSignal$);
        },
        resetOnError: true,
        resetOnComplete: false,
        resetOnRefCountZero: false,
      });
    }
    // Acts like shareReplay(1) but clears its buffer when resetSignal$ emits.