javascriptrxjs

combineLatest input observable firing twice with different values


I am using RxJS to fire a paged API call. The API call fires either when a category is selected, or when the user changes pages. I would also like the category selection to reset the page state. Here is essentially what I'm doing:

const categorySelect$ = new Subject();
const pageSelect$ = new Subject();
const pageSizeSelect$ = new Subject();
const pageNum$ = merge(
  categorySelect$.pipe(map(() => 1)),
  pageSelect$
);
const page$ = combineLatest([pageNum$, pageSizeSelect$])
  .pipe(
    map(([pageNum, pageSize]) => ({ pageNum, pageSize })),
    shareReplay(1)
  );
const apiCall$ = combineLatest([categorySelect$, page$])
  .pipe(
     switchMap(([category, page]) => apiClient.getItems(category, page))
  );
apiCall$.subscribe();

The problem I'm having is that apiCall$ is getting two observations. The first observation is correct. I get the category that I clicked on. The second click refires the second observation then the first observation. If I swap the order of categorySelect$ and page$ the second observation fires second, but because of other code that runs here (not shown), that is still undesirable, and I would rather just have it work correctly in general.

I even tried changing the page$ input for apiCall$ to be pageSelect$.pipe(switchMap(() => page$ to ensure it only actually fires when the user clicks a page, but the problem persists. I also tried changing categorySelect$ to a ReplaySubject(1) and that also did not fix it. I have used this pattern before without this issue so I'm not sure what's happening.

I'm sure I could work around this by simply doing a tap(() => pageNum$.next(0)) but to me that seems to be an anti-pattern. I would prefer a solution that uses proper declarative composition.


Solution

  • Since the category subject is used in two substreams, the single emission of category, gives two emissions of the final stream, so we should promote category to the top and build our stream based on this.


    Since category is the controller of the state reset, it should be at the top of the stream.

    We can use map to define the object that holds the state.

    The switchMap is used along with combineLatest to listen for the subject changes of pageSize and pageNum, these emissions update the state object.

    I have used startWith, so that the stream does not wait for the two subject emissions to happen for the stream to flow, instead you can replace the subject with a BehaviourSubject which does the same thing (less code).

    Now, our state will have the latest values and we can simply call the API call with these values.

    import './style.css';
    
    import {
      Subject,
      merge,
      shareReplay,
      switchMap,
      combineLatest,
      map,
      of,
      startWith,
      tap,
    } from 'rxjs';
    
    const apiClient = {
      getItems(category: any, page: any) {
        console.log(category, page);
        return of([]);
      },
    };
    
    const categorySelect$ = new Subject();
    const pageSelect$ = new Subject();
    const pageSizeSelect$ = new Subject();
    
    const apiCall$ = categorySelect$.pipe(
      map((category: any) => ({ page: { pageNum: 1, pageSize: 10 }, category })),
      switchMap((state: any) => {
        return combineLatest([
          pageSelect$.pipe(
            startWith(1),
            tap((pageNum: any) => {
              state.page.pageNum = pageNum;
            })
          ),
          pageSizeSelect$.pipe(
            startWith(10),
            tap((pageSize: any) => {
              state.page.pageSize = pageSize;
            })
          ),
        ]).pipe(switchMap(() => apiClient.getItems(state.category, state.page)));
      })
    );
    apiCall$.subscribe(console.log);
    
    categorySelect$.next('qwerty');
    pageSizeSelect$.next(20);
    pageSelect$.next(2);
    
    categorySelect$.next('qwerty2');
    

    Stackblitz Demo