angularcomponentsrxjsrxjs5

RxJS: takeUntil() Angular component's ngOnDestroy()


tl;dr: Basically I want to marry Angular's ngOnDestroy with the Rxjs takeUntil() operator. -- is that possible?

I have an Angular component that opens several Rxjs subscriptions. These need to be closed when the component is destroyed.

A simple solution for this would be:

class myComponent {

  private subscriptionA;
  private subscriptionB;
  private subscriptionC;

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.subscriptionA = this.serviceA.subscribe(...);
    this.subscriptionB = this.serviceB.subscribe(...);
    this.subscriptionC = this.serviceC.subscribe(...);
  }

  ngOnDestroy() {
    this.subscriptionA.unsubscribe();
    this.subscriptionB.unsubscribe();
    this.subscriptionC.unsubscribe();
  }

}

This works, but it's a bit redundant. I especially don't like that - The unsubscribe() is somewhere else, so you gotta remember that these are linked. - The component state is polluted with the subscription.

I would much prefer using the takeUntil() operator or something similar, to make it look like this:

class myComponent {

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    const destroy = Observable.fromEvent(???).first();
    this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy);
    this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy);
    this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy);
  }

}

Is there a destroy event or something similar that would let me use takeUntil() or another way to simplify the component architecture like that? I realize I could create an event myself in the constructor or something that gets triggered within ngOnDestroy() but that would in the end not make things that much simpler to read.


Solution

  • You could leverage a ReplaySubject for that:

    EDIT: Different since RxJS 6.x: Note the use of the pipe() method.

    class myComponent {
      private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);
    
      constructor(
        private serviceA: ServiceA,
        private serviceB: ServiceB,
        private serviceC: ServiceC) {}
    
      ngOnInit() {
        this.serviceA
          .pipe(takeUntil(this.destroyed$))
          .subscribe(...);
        this.serviceB
          .pipe(takeUntil(this.destroyed$))
          .subscribe(...);
        this.serviceC
          .pipe(takeUntil(this.destroyed$))
          .subscribe(...);
      }
    
      ngOnDestroy() {
        this.destroyed$.next(true);
        this.destroyed$.complete();
      }
    }
    

    This is only valid for RxJS 5.x and older:

    class myComponentOld {
      private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);
    
      constructor(private serviceA: ServiceA) {}
    
      ngOnInit() {
        this.serviceA
          .takeUntil(this.destroyed$)
          .subscribe(...);
      }
    
      ngOnDestroy() {
        this.destroyed$.next(true);
        this.destroyed$.complete();
      }
    }