I've found a strange rxjs
behavior that keeps bothering me, and I can't even imagine a solution to this problem.
I need to create an Observable
from Subject
that will either emit an error if the consumed value is null
, or emit a value. And then emit from Subject
a null
value.
Everything works as intended, but the subscriber
of this Observable
keeps consume the error that was emitted after the first subscription, even with the first()
operator.
How is this possible?
import { Subject, throwError, of } from 'rxjs';
import { switchMap, tap, first } from 'rxjs/operators';
var subj$ = new Subject();
var obs$ = subj$.asObservable().pipe(
switchMap((v) => {
if (!v) {
return throwError(() => 'ERROR');
}
return of(v);
})
);
obs$
.pipe(
first(),
tap({
next(v) {
console.log('obs$', v);
},
error(err) {
console.error('obs$ error', err);
},
}),
tap(() => subj$.next(null))
)
.subscribe();
subj$.next('VALUE');
// obs$ VALUE
// obs$ error ERROR <- ???
// ERROR <- ???
https://stackblitz.com/edit/stackblitz-starters-sdx1oqzx?file=index.js
Real world example:
import { Observable, BehaviorSubject, zip, throwError, of } from 'rxjs';
import { first, switchMap, tap } from 'rxjs/operators';
type User {
id: number,
name: string
}
class AuthService {
private isAuthenticatedSource = new BehaviorSubject<boolean>(false);
private userSource = new BehaviorSubject<User | null>(null);
isAuthenticated$: Observable<boolean> = this.isAuthenticatedSource.asObservable();
user$: Observable<User> = zip(
this.isAuthenticatedSource.asObservable(),
this.userSource.asObservable(),
).pipe(
switchMap(([ isAuthenticated, user ]) => {
if (!isAuthenticated || !user) {
return throwError(() => 'Unauthenticated.');
}
return of(user);
})
);
constructor(private socketService: SocketService) {}
authenticate(user: User) {
this.isAuthenticatedSource.next(true);
this.userSource.next(user);
return this.socketService.emit('join', user);
}
// consumes the error from second emission
deauthenticate() {
return this.user$.pipe(
first(),
switchMap((user: User) => this.socketService.emit('leave', user)),
tap(() => {
this.isAuthenticatedSource.next(false);
this.userSource.next(null);
})
);
}
}
class SocketService {
socket = {
emit(event: string, data: any, ack: (res: any) => any) {
return { event, data, ack };
}
}
emit<T>(event: string, data: any): Observable<T> {
return new Observable<T>(subscriber => {
this.socket.emit(event, data, (res) => {
subscriber.next(res);
subscriber.complete();
})
});
}
}
Looks like a bug that was fixed in v8.
Swap first
with takeUntil
and complete the subject in the tap. Mimics first
behavior.
import './style.css';
import { Subject, throwError, of } from 'rxjs';
import { switchMap, tap, first, takeUntil } from 'rxjs/operators';
var subj$ = new Subject();
var obs$ = subj$.asObservable().pipe(
switchMap((v) => {
if (!v) {
return throwError(() => 'ERROR');
}
return of(v);
})
);
const destroy$ = new Subject<void>();
obs$
.pipe(
takeUntil(destroy$),
tap({
next(v) {
console.log('obs$', v);
},
error(err) {
console.error('obs$ error', err);
},
}),
tap(() => {
destroy$.next();
destroy$.complete();
})
)
.subscribe();
subj$.next('VALUE');