angularrxjsrxjs-observablesbehaviorsubjectfork-join

Rxjs forkJoin operator doesn't emit anything


I have a problem using forkjoin. I have three Angular services that take care of making an http call. In each of these services there is an Observable, when the htttp request receives a response, the Observable issues a .next() with the data received from the response. I'm sure these services work because I used to combine them with combineLatest and they worked. Now I need that until all requests are responded to, the frontend is not updated.

     const ob1$  = this.service1.ob1$;
     const ob2$ = this.service1.ob2$;
     const ob3$ = this.service1.ob3$;
     const sources = [ ob1$, ob2$, ob3$];

     forkJoin(sources).pipe(
       map(([ob1$, ob2$, ob3$]) => {
         return {
           ob1: ob1$.pipe(catchError(error => of(error))),
           ob2: ob2$.pipe(catchError(error => of(error))),
           ob3: ob3$.pipe(catchError(error => of(error))),
         };
       }),
       catchError((error) => of({ error }))
     )
     .subscribe((data) => { console.log(data) });

Example of service

private _ob3$: BehaviorSubject<Object> = new BehaviorSubject<Object>({});
  public get ob3$(): Observable<any> {
       

return this._ob3$.asObservable();
  }
  public set ob3(value: BehaviorSubject<Object>) {
       this._ob3$ = value;
  }


public getOb3(): Subscription {
     return this.http.get(`....`, ).pipe(map((data: any) => {
         this._ob3$.next(data);
         return data;
    }),
 }

I knew they worked, but to be sure I also tested the functioning of all the observables individually :

 ob1$.subscribe({
          next : (data) => {  console.log("ob1$",data) }, 
          error : (error) => {  console.error(error) },
          complete : () => { console.log("complete")  }
      })
      ob2$.subscribe({
          next : (data) => {  console.log("ob2$",data) }, 
          error : (error) => {  console.error(error) },
          complete : () => { console.log("complete")  }
      })
      ob3$.subscribe({
         next : (data) => {  console.log("ob3$",data) }, 
         error : (error) => {  console.error(error) },
         complete : () => { console.log("complete")  }
      })

Tested individually they logged correct data, with the forkjoin they logged neither data nor error messages


Solution

  • forkJoin waits for the passed Observables to complete and then combines the last emitted values. You are passing an Observable generated out of a BehaviorSubject, which never completes. As a result, the forkJoin never emits a value.

    You should use combineLatest instead, which emits when all passed observables emitted at least once.

    Alternatively, you can remove the BehaviorSubject and use the Observable from the HttpClient directly. Those Observables complete once the server responded and work fine with forkJoin.