javascriptrxjs

Why subject's subscriber keeps consume an error, after the first() operator, if the next emit of this subject was called in the inner tap() operator?


I've found a strange rxjs behavior that keeps bothering me, and I can't even imagine a solution to this problem.

I need to create an Observable from Subject that will either emit an error if the consumed value is null, or emit a value. And then emit from Subject a null value. Everything works as intended, but the subscriber of this Observable keeps consume the error that was emitted after the first subscription, even with the first() operator.

How is this possible?

import { Subject, throwError, of } from 'rxjs';
import { switchMap, tap, first } from 'rxjs/operators';

var subj$ = new Subject();

var obs$ = subj$.asObservable().pipe(
  switchMap((v) => {
    if (!v) {
      return throwError(() => 'ERROR');
    }

    return of(v);
  })
);

obs$
  .pipe(
    first(),
    tap({
      next(v) {
        console.log('obs$', v);
      },
      error(err) {
        console.error('obs$ error', err);
      },
    }),
    tap(() => subj$.next(null))
  )
  .subscribe();

subj$.next('VALUE');

// obs$ VALUE
// obs$ error ERROR <- ???
// ERROR <- ???

https://stackblitz.com/edit/stackblitz-starters-sdx1oqzx?file=index.js

Real world example:

import { Observable, BehaviorSubject, zip, throwError, of } from 'rxjs';
import { first, switchMap, tap } from 'rxjs/operators';

type User {
  id: number,
  name: string
}

class AuthService {
  private isAuthenticatedSource = new BehaviorSubject<boolean>(false);
  private userSource = new BehaviorSubject<User | null>(null);

  isAuthenticated$: Observable<boolean> = this.isAuthenticatedSource.asObservable();

  user$: Observable<User> = zip(
    this.isAuthenticatedSource.asObservable(),
    this.userSource.asObservable(),
  ).pipe(
    switchMap(([ isAuthenticated, user ]) => {
      if (!isAuthenticated || !user) {
        return throwError(() => 'Unauthenticated.');
      }

      return of(user);
    })
  );

  constructor(private socketService: SocketService) {}

  authenticate(user: User) {
    this.isAuthenticatedSource.next(true);
    this.userSource.next(user);

    return this.socketService.emit('join', user);
  }

  // consumes the error from second emission
  deauthenticate() {
    return this.user$.pipe(
      first(),
      switchMap((user: User) => this.socketService.emit('leave', user)),
      tap(() => {
        this.isAuthenticatedSource.next(false);
        this.userSource.next(null);
      })
    );
  }
}

class SocketService {
  socket = {
    emit(event: string, data: any, ack: (res: any) => any) {
      return { event, data, ack };
    }
  }

  emit<T>(event: string, data: any): Observable<T> {
    return new Observable<T>(subscriber => {
      this.socket.emit(event, data, (res) => {
        subscriber.next(res);
        subscriber.complete();
      })
    });
  }
}

Solution

  • Looks like a bug that was fixed in v8.

    Stackblitz - 8.0.0-alpha.12 (Working)

    Stackblitz - 7.8.2 (Not Working)

    Workaround:

    Swap first with takeUntil and complete the subject in the tap. Mimics first behavior.

    import './style.css';
    
    import { Subject, throwError, of } from 'rxjs';
    import { switchMap, tap, first, takeUntil } from 'rxjs/operators';
    
    var subj$ = new Subject();
    
    var obs$ = subj$.asObservable().pipe(
      switchMap((v) => {
        if (!v) {
          return throwError(() => 'ERROR');
        }
    
        return of(v);
      })
    );
    
    const destroy$ = new Subject<void>();
    obs$
      .pipe(
        takeUntil(destroy$),
        tap({
          next(v) {
            console.log('obs$', v);
          },
          error(err) {
            console.error('obs$ error', err);
          },
        }),
        tap(() => {
          destroy$.next();
          destroy$.complete();
        })
      )
      .subscribe();
    
    subj$.next('VALUE');
    

    Stackblitz Demo - Workaround