javascriptnode.jsrxjsrxjs5rxjs-dom

How to control pressure of multiple ajax calls with RxJS


Using RxJS 5 i want to solve the following:

Lets say that Im fetching a list of categories of sorts from a REST API.

Based on each and one of those categories, I want to fetch sub categories from another REST endpoint.

Then, based on each and one of those sub categories, I want to fetch products and for each and one of those products we need to fetch the detailed description.

This I have solved. The problem is that the ajax calls escalate and under just a minute over 30k calls are made which results in bringing the server to its knees.

Now, since this is a nightly job Im okay with it taking some time as long as it completes successfully.

This is what I have:

getCategories() // Wraps ajax call and returns payload with array of categories
      .switchMap(categories => Observable.from(categories))
      .mergeMap(category => 
        getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
          .catch(err => {
            console.error('Error when fetching sub categories for category:', category);
            console.error(err);
            return Observable.empty();
          })
      )
      .mergeMap(subCategories => Observable.from(subCategories))
      .mergeMap(subCategory => 
        getProducts(subCategory) // Wraps ajax call and returns payload with array of products
        .catch(err => {
          console.error('Error when fetching products for sub category:', subCategory);
          console.error(err);
          return Observable.empty();
        })
      )
      .mergeMap(products => Observable.from(products))
      .mergeMap(product => 
        getProductDetails(product) // Wraps ajax call and returns payload with product details
        .catch(err => {
          console.error('Error when fetching product details for product:', product);
          console.error(err);
          return Observable.empty();
        })
      )
      .mergeMap(productDetails => saveToDb(productDetails))
      .catch(err => {
        console.error(err);
      })
      .subscribe();

After the initial request where i fetch the categories, I want to:

Every call made to fetch sub categories should wait until the previous one is done. Only 5 ajax calls should be made at once when fetching products and those products details. After those 5 calls are done we trigger the next 5 calls etc.

Alternatively it could be controlled with time as in waiting x seconds before we do the next ajax call etc.

How would I go about to solve this in a nice with using RxJS based on my example above?


Solution

  • mergeMap has an overload which takes an optional concurrency parameter. This you can utilize to control how many requests are being sent concurrently to your server.

    public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

      .mergeMap(category => 
        getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
          .catch(err => {
            console.error('Error when fetching sub categories for category:', category);
            console.error(err);
            return Observable.empty();
          }),
        null, 
        5 /* concurrency */
      )
    

    Also; mergeMap will convert anything observableLike to an Observables so if your getSubCategories() returns an Array that will automatically be converted to an observable, no further Observable.from() is needed.