I need to implement HTTP Longpolling using rxjs (in an Angular environment). The challenge is that I need to change the request parameters on every call. To be more specific: I need to change the from/to values to have it rolling. For example always go back 1 minute in time from the current time.
Right now I have the following implementation:
const requestParameter$ = new BehaviorSubject<MetricAggregationParams>(initialRequestParameter);
this.metricsService
.searchAggregatedMetrics(requestParameter$)
.pipe(
tap((metricInstancesResult) => {
// do something with the result
}),
delay(3000),
tap(() => {
requestParameter$.next({
...requestParameter$.value,
from: DateTime.now()
.minus({ minutes: timerange.timerangeInMinutes as number })
.toISO(),
to: DateTime.now().toISO()
});
})
)
.subscribe();
})
searchAggregatedMetrics(requestParameter$: BehaviorSubject<MetricAggregationParamsDto>) {
return requestParameter$.pipe(
concatMap((requestParameter) =>
this.http.post<MetricAggregationResult>(`REST_URL`, requestParameter)
)
);
}
Here are some constraints:
Is there a way to have the longpolling logic all together in the searchAggregatedMetrics method?
If I understand the problem right, you are facing some kind of recursive problem. Recursion in rxJs is addressed with the expand
operator.
One solution to your problem could be along these lines
restService(0).pipe(
delay(300),
map(resp => {
console.log("do stuff with resp " + resp)
}),
map(() => {
console.log("prepare next input " + counter++)
return counter
}),
// expand is an operator that takes as input a function that
// returns an Observable
expand(input => {
return restService(counter).pipe(
delay(300),
map(resp => {
console.log("do stuff with resp " + resp)
}),
map(() => {
console.log("prepare next input " + counter++)
return counter
}),
)
}),
// we use take just to finish the iteration after a certain number of calls
take(10)
).subscribe()
// counter used to simulate the fact that we change input at every call
let counter = 0
// function to simulate a rest service call
function restService(input: number) {
return of(input).pipe(
tap(input => console.log("Input received " + input)),
map(input => {
const resp = "response " + input
return resp
})
)
}
There is clearly a repetition in the above code, which can therefore coded a bit more elegantly (but maybe less clearly) like this
function restServiceOnSteroids(input: number) {
return restService(input).pipe(
delay(300),
map(resp => {
console.log("do stuff with resp " + resp)
}),
map(() => {
console.log("prepare next input " + counter++)
return counter
})
)
}
restServiceOnSteroids(counter).pipe(
expand(input => restServiceOnSteroids(input)),
take(7)
).subscribe()
Here a stackblitz that reproduces this solution