angularrxjsrxjs-pipeable-operators

Unsure how to reset stream based on previous values


I'm working on the following RxJs stream within an Angular app and I'm running into trouble resetting the values. I have multiple places in my application which emit values to these three observables within the combineLatest call here, such as when a user changes a filter setting, or updates the page via an input field. Additionally I have a lazy load function that ticks the page forward when the user gets near the bottom.

When name and filter update I want it to only return the most recent data from getContent, but when the page observable has a new value I want it to combine the previous data with the current one via the scan operator. The only problem I have is that I can't seem to figure out the best way to do this within scan because at that point it doesn't know what the current and previous values were within mergeMap from name and filter.

getContent(name, page filter) {
    return this.http
      .get(
        `${this.API_BASE}/${name}/${filter}/${page}`
)

The stream looks like the following:

this.results$ = combineLatest(
  this.dataService.getName(),
  this.dataService.getPage(),
  this.dataService.getFilter()
).pipe(
  mergeMap(([name, page, filter]) => {
     this.dataService.getContent(name, filter, page);
  }),
  scan(
    (
      acc,
      curr
    ) => {
      this.nextPage = curr[curr.length - 1].id;
      if (acc.length && curr.length) {
         return acc.concat(curr);
      }

      return acc;
    },
    []
  )
);

The template is just a div that gets looped over and updated with the async pipe, which I'd like to keep if possible. Is there a better way to handle this within a single stream, or a way to break this apart in a way where I can get it to do what I need it to?


Solution

  • You can simply use combineLatest on your "criteria", then feed that to a switchMap (provides reset whenever criteria changes), then feed the criteria result to the observable of the getPage(). At that point, you have all 3 arguments needed to call getContent().

    Something like this should work for you:

      results$ = combineLatest([
        this.dataService.getName(),
        this.dataService.getFilter(),
      ]).pipe(
        switchMap(([name, filter]) => this.dataService.getPage().pipe(
          startWith(undefined), // initially emit `undefined` becuase there's not a "next page" cursor for the first call.
          mergeMap(page => this.dataService.getContent(name, filter, page)),
          scan((acc, curr) => {
            this.nextPage = curr[curr.length - 1].id;
      
            if (curr.length) {
              return acc.concat(curr);
            }
      
            return acc;
          }, [])
        )),
      );
    

    It's best if the observable stream doesn't rely on outside variables. (this.nextPage)

    We can get around this by changing the shape we emit. Instead of emitting only the "results" we can also have it include the "nextPage" information as well:

      query$ = combineLatest([
        this.dataService.getName(),
        this.dataService.getFilter(),
      ]).pipe(
        switchMap(([name, filter]) => this.dataService.getPage().pipe(
          startWith(undefined),
          mergeMap(page => this.dataService.getContent(name, filter, page)),
          scan(
            (acc, curr) => ({ 
              results  : curr.length ? acc.results.concat(curr) : acc.results, 
              nextPage : curr[curr.length - 1]?.id
            }),
            { results: [] as Result[], nextPage: undefined })
        )),
      );
    
    <ng-container *ngIf="query$ | async as query">
    
      <div *ngFor="let result of query.results">
        {{ result.label }}
      </div>
    
      <button *ngIf="query.nextPage" (click)="loadMore(query.nextPage)"> 
        Load More 
      </button>
    
    </ng-container>
    

    Here's a working StackBlitz demo.


    Two last notes:

    1. If the getPage(), getFilter(), & getName() methods don't take any params, you can simply declare them as observables, rather than methods that return observables: page$, filter$, name$

    2. The data service is exposing multiple observables, so the component can subscribe to them and feed the emissions back to the service's own getContent() method. Since your data service maintains all of the observable sources, it would probably be simpler to declare the content as an observable in the service instead of in the component.

    Check out this StackBlitz if you want to :-)