Using RxJS 5 i want to solve the following:
Lets say that Im fetching a list of categories of sorts from a REST API.
Based on each and one of those categories, I want to fetch sub categories from another REST endpoint.
Then, based on each and one of those sub categories, I want to fetch products and for each and one of those products we need to fetch the detailed description.
This I have solved. The problem is that the ajax calls escalate and under just a minute over 30k calls are made which results in bringing the server to its knees.
Now, since this is a nightly job Im okay with it taking some time as long as it completes successfully.
This is what I have:
getCategories() // Wraps ajax call and returns payload with array of categories
.switchMap(categories => Observable.from(categories))
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
})
)
.mergeMap(subCategories => Observable.from(subCategories))
.mergeMap(subCategory =>
getProducts(subCategory) // Wraps ajax call and returns payload with array of products
.catch(err => {
console.error('Error when fetching products for sub category:', subCategory);
console.error(err);
return Observable.empty();
})
)
.mergeMap(products => Observable.from(products))
.mergeMap(product =>
getProductDetails(product) // Wraps ajax call and returns payload with product details
.catch(err => {
console.error('Error when fetching product details for product:', product);
console.error(err);
return Observable.empty();
})
)
.mergeMap(productDetails => saveToDb(productDetails))
.catch(err => {
console.error(err);
})
.subscribe();
After the initial request where i fetch the categories, I want to:
Every call made to fetch sub categories should wait until the previous one is done. Only 5 ajax calls should be made at once when fetching products and those products details. After those 5 calls are done we trigger the next 5 calls etc.
Alternatively it could be controlled with time as in waiting x seconds before we do the next ajax call etc.
How would I go about to solve this in a nice with using RxJS based on my example above?
mergeMap
has an overload which takes an optional concurrency parameter. This you can utilize to control how many requests are being sent concurrently to your server.
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
}),
null,
5 /* concurrency */
)
Also; mergeMap will convert anything observableLike to an Observables so if your getSubCategories()
returns an Array that will automatically be converted to an observable, no further Observable.from()
is needed.