javascriptrxjssubjectreplaysubject

Select ReplaySubject values emitted untill now


Is there reliable solution to get a snapshot of values currently buffered in ReplaySubject?

I came up with something like this, but not sure how reliable this solution is:

replaySubject.pipe(
    takeUntil(timer(10).pipe(take(1)))
)

The timer factor above seems wrong to me as I have no warranty if all values have been emitted properly within given time (this applies to replay subjects with a lot of values).


Solution

  • I think this can be an approach:

    replay
      .pipe(
        buffer(timer(0)),
        take(1)
      )
      .subscribe(a => {
        console.log(`This is one time value ${a}`);
      });
    

    StackBlitz demo.

    The ReplaySubject, when a new subscriber is registered, it will emit its values synchronously:

    for (let i = 0; i < copy.length && !subscriber.closed; i += infiniteTimeWindow ? 1 : 2) {
          subscriber.next(copy[i] as T);
        }
    

    for this reason, we're using buffer(timer(0)): by the time the buffer's notifier(timer(0)) emits, the values will have already been added in the buffer's array. So, it will be able to emit all the ReplaySubject's accumulated values.