angularrxjslong-polling

HTTP Longpolling with changing request parameters using rxjs


I need to implement HTTP Longpolling using rxjs (in an Angular environment). The challenge is that I need to change the request parameters on every call. To be more specific: I need to change the from/to values to have it rolling. For example always go back 1 minute in time from the current time.

Right now I have the following implementation:

const requestParameter$ = new BehaviorSubject<MetricAggregationParams>(initialRequestParameter);

this.metricsService
  .searchAggregatedMetrics(requestParameter$)
  .pipe(
    tap((metricInstancesResult) => {
      // do something with the result
    }),
    delay(3000),
    tap(() => {
      requestParameter$.next({
        ...requestParameter$.value,
        from: DateTime.now()
          .minus({ minutes: timerange.timerangeInMinutes as number })
          .toISO(),
        to: DateTime.now().toISO()
      });
    })
  )
  .subscribe();
})

searchAggregatedMetrics(requestParameter$: BehaviorSubject<MetricAggregationParamsDto>) {
  return requestParameter$.pipe(
    concatMap((requestParameter) =>
      this.http.post<MetricAggregationResult>(`REST_URL`, requestParameter)
    )
  );
}

Here are some constraints:

Is there a way to have the longpolling logic all together in the searchAggregatedMetrics method?


Solution

  • If I understand the problem right, you are facing some kind of recursive problem. Recursion in rxJs is addressed with the expand operator.

    One solution to your problem could be along these lines

    restService(0).pipe(
      delay(300),
      map(resp => {
        console.log("do stuff with resp " + resp)
      }),
      map(() => {
        console.log("prepare next input " + counter++)
        return counter
      }),
      // expand is an operator that takes as input a function that
      // returns an Observable
      expand(input => {
        return restService(counter).pipe(
          delay(300),
          map(resp => {
            console.log("do stuff with resp " + resp)
          }),
          map(() => {
            console.log("prepare next input " + counter++)
            return counter
          }),
        )
      }),
      // we use take just to finish the iteration after a certain number of calls
      take(10)
    ).subscribe()
    
    // counter used to simulate the fact that we change input at every call
    let counter = 0
    // function to simulate a rest service call
    function restService(input: number) {
      return of(input).pipe(
        tap(input => console.log("Input received " + input)),
        map(input => {
          const resp = "response " + input
          return resp
        })
      )
    }
    

    There is clearly a repetition in the above code, which can therefore coded a bit more elegantly (but maybe less clearly) like this

    function restServiceOnSteroids(input: number) {
      return restService(input).pipe(
        delay(300),
        map(resp => {
          console.log("do stuff with resp " + resp)
        }),
        map(() => {
          console.log("prepare next input " + counter++)
          return counter
        })
      )
    }
    
    restServiceOnSteroids(counter).pipe(
      expand(input => restServiceOnSteroids(input)),
      take(7)
    ).subscribe()
    

    Here a stackblitz that reproduces this solution