I am using redux-oservable and am having an issue with timing on my observables. I want to introduce a delay between the emission of values within an array.
The epic that I'm working with looks like this at the moment:
export const fileImportEpi = (
action$: any,
state$: any,
{ apiClient }: IDependencies
) =>
action$.pipe(
ofType(FILE_IMPORT),
withLatestFrom(state$),
mergeMap(([{ payload }]: any) => { // Input is a 2 element array of objects, where the first object contains the payload key
const { id, file, config = {} } = payload
const headers = {
'Content-Type': 'application/zip',
Filename: file.name,
}
return from(
apiClient.post(('import', file, {
...config,
headers,
})
).pipe(
mapTo(fileSuccessAction(id)),
catchError((error) => {
return of(
apiErrorAction(error.response ? error.response.data : error),
fileAttrsUpdateAction(id, {
error: error.response ? error.response.data : error,
}),
fileFailAction(id)
)
}) // Catch
) // Pipe
}) // MergeMap
)
The issue is that is basically launches all of the POST requests immediately, and this is causing issues in the backend. What I would like to do is introduce a delay between the emission of each post request (let's say 20ms).
I can't seem to make this work though. Wherever I introduce the delay is seems to just launch all of the post requests sooner or later, however never with delay between each request. How could I go about doing this?
(You could argue that the issue should be fixed in the backend however that is not really an option at the moment.)
Edit:
To show another method I have also tried:
export const fileImportEpic = (
action$: any,
state$: any,
{ fpiClient }: IDependencies
) =>
action$.pipe(
ofType(FILE_IMPORT),
withLatestFrom(state$),
mergeMap(([{ payload }]: any) => {
// mergeMap(([{ payload }]: any) => { // Input looks like: [{payload: obj, otheKeys:...}, {type:val, otherKeys:...}] (Ie. a 2 element array where the first element contains the payload)
const { id, file, config = {} } = payload
const headers = {
'Content-Type': 'application/zip',
Filename: file.name,
}
const pl = [payload]
// Try to release each part of the payload with a delay in between, instead of returning a post request promise
return from(pl).pipe(
delay(3000),
mergeMap((payload: any) => {
const { id, file, config = {} } = payload
const headers = {
'Content-Type': 'application/zip',
Filename: file.name,
}
return from(
apiClient.post('archives/import', file, {
...config,
headers,
})
).pipe(
mapTo(fileSuccessAction(id)),
catchError((error) => {
return of(
apiErrorAction(error.response ? error.response.data : error),
fileAttrsUpdateAction(id, {
error: error.response ? error.response.data : error,
}),
fileFailAction(id)
)
}) // Catch
)
})
) // Pipe
}) // MergeMap 1
)
change mergeMap
to concatMap
const { of, from, delay, concatMap } = rxjs;
const api = (id) => fetch(`https://jsonplaceholder.typicode.com/todos/${id}`).then((res) => res.json());
of(1, 2, 3, 4, 5).pipe(
concatMap((id) => from(api(id)).pipe(delay(1000)))
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>