javascriptangularpromiserxjsrxjs-observables

How to repeat a promise conditionally using rxjs


I would like to repeat an API call which returns a Promise, conditionally using rxjs.

The API method receives an id which will be changed on every call by adding a counter prefix to it. The calls will be repeated until the data met some condition or the counter reach to a specific number X. How it can be done using rxjs?

API method:

fetchData(id):Promise<data>

try 1: fetchData(id)

try 2: fetchData(id_1)

try 3: fetchData(id_2)


Solution

  • IMO, it's better to handle the polling either through Promises or RxJS without mixing them. I'd illustrate using RxJS.

    Try the following

    1. Convert the promise to an observable using the RxJS from function.
    2. Use RxJS functions like timer or interval to regularly emit a value on a fixed interval.
    3. Use a higher order mapping operator like switchMap to map from the outer emission to your API call. Refer here for a brief description about different types of higher order mapping operators.
    4. Use two takeWhile operators, one for each of your condition respectively, to complete the subscription.
    5. Use filter operator to only forward the emissions that pass the condition.
    import { from } from 'rxjs';
    
    fetchData(id: any): Observable<any> {  // <-- return an observable
      return from(apiCall);                // <-- use `from` to convert Promise to Observable
    }
    
    import { timer } from 'rxjs';
    import { filter, switchMap, takeWhile } from 'rxjs/operators';
    
    timer(0, 5000).pipe(                        // <-- poll every 5 seconds
      takeWhile((index: number) => index < 20)  // <-- stop polling after 20 attempts
      switchMap((index: number) => 
        this.someService.apiCall(index+1)       // <-- first emission from `timer` is 0
      ),
      takeWhile(                                // <-- stop polling when a condition from the response is unmet
        (response: any) => response.someValue !== someOtherValue,
        true                                    // <-- emit the response that failed the test
      ),
      filter((response: any) => 
        response.someValue === someOtherValue   // <-- forward only emissions that pass the condition
      )
    ).subscribe({
      next: (response: any) => {
        // handle response
      },
      error: (error: any) => {
        // handle error
      }
    });
    

    Edit: The condition in the 2nd takeWhile was doing the opposite of the requirement. I've adjusted the condition and included the inclusive=true argument. Thanks @Siddhant in the comments.