rxjsrxjs-observables

how to unsubscribe a RXJS subscription inside the subscribe method?


I have some javascript:

this.mySubscription = someObservable.subscribe((obs: any) => {
   this.mySubscription.unsubscribe();
   this.mySubscription = undefined;
}

on execution, the console logs the error ERROR TypeError: Cannot read property 'unsubscribe' of undefined. I wonder why I can not unsubscribe inside the subscribe lambda function. Is there a correct way to do so? I have read a bit about using dummy-subjects and completing them or using takeUntil/takeWhile and other pipe operators workArounds.

What is a correct way/workaround to unsubscribe a subscription inside the subscription's subscribe-function?

I am currently using a dummy subscription like so:

mySubscription: BehaviorSubject<any> = new BehaviorSubject<any>(undefined);


// when I do the subscription:
dummySubscription: BehaviorSubject<any> = new BehaviourSubject<any>(this.mySubscription.getValue());
this.mySubscription = someObservable.subscribe((obs: any) => {
    // any work...
    dummySubscription.next(obs);
    dummySubscription.complete();
    dummySubscription = undefined;
}, error => {
    dummySubscription.error(error);
});

dummySubscription.subscribe((obs: any) => {
    // here the actual work to do when mySubscription  emits a value, before it should have been unsubscribed upon
}, err => {
    // if errors need be
});

Solution

  • You shouldn't try to unsubscribe in the subscribe function.
    You can unsubscribe with operators like take, takeWhile or takeUntil.

    take

    Use take(n) to unsubscribe after someObservable emits n times.

    someObservable.pipe(
      take(1)
    ).subscribe(value => console.log(value));
    

    takeWhile

    Use takeWhile to unsubscribe when an emitted value fails a condition.

    someObservable.pipe(
      takeWhile(value => valueIsSave(value))
    ).subscribe(value => console.log(value));
    
    valueIsSave(value): boolean {
      // return true if the subscription should continue
      // return false if you want to unsubscribe on that value
    }
    

    takeUntil

    Use takeUntil(obs$) to unsubscribe when the observable obs$ emits.

    const terminate = new Subject();
    
    someObservable.pipe(
      takeUntil(terminate)
    ).subscribe(value => console.log(value));
    
    unsub() { 
      terminate.next() // trigger unsubscribe
    }