rxjsobservableretrywhen

RxJS Observable: retry using count and then using notifier


Firstly, I'd like to retry using a simple count:

(Preferably, the error may be emitted immediately after each retry, but retry(count) does not seem to do this.)

If I understand correctly, this is the behaviour of retry(count):

{
  new Rx.Observable(observer => {
    console.log("subscribe");
    observer.error("ERROR");
  })
    .retry(3)
    .subscribe({
      error: console.log
    });
}
// subscribe
// subscribe
// subscribe
// subscribe
// ERROR

Then, I would like to allow the user to retry manually. When a retry notifier observable (retry$) emits, retry the observable again, emitting the error each time afterwards.

I tried to use retryWhen for this, however whilst the retries do occur, the error is never emitted.

I want to retry but also emit any errors, so that I can display them in the user interface whilst the retry is running.

{
  const retrySubject = new Rx.Subject();
  const retry$ = retrySubject.asObservable();
  new Rx.Observable(observer => {
    console.log("subscribe");
    observer.error("ERROR");
  })
    .retryWhen(() => retry$)
    .subscribe({
      error: console.log
    });
  retrySubject.next();
}
// subscribe
// subscribe

Furthermore, I'm not sure how to combine this with retry(count). If I chain the retry operators, the would trigger each other.


Solution

  • retryWhen provides a stream of errors - you can peek the stream and ignore it after 3 emissions and then only retry when triggered by user.

    const retrySubject = new Rx.Subject();
    const retry$ = retrySubject.asObservable();
    new Rx.Observable(observer => {
      console.log("subscribe");
      observer.error("ERROR");
    })
      .retryWhen(errors => Rx.Observable.merge(
        errors
          .do(error => console.log(error)) // log error
          .filter((error, index) => index < 3), // take only first 3 and ignore the rest
        retry$ // also retry with user request
      ))
      .subscribe();
    
    retrySubject.next();
    

    You could use take(3) instead of filter but that would stop the errors stream so logging of errors would also stop. filter operator will keep it 'running'.