reactjsreduxrxjsredux-observable

Delay time between Observable values - RxJS


I am using redux-oservable and am having an issue with timing on my observables. I want to introduce a delay between the emission of values within an array.

The epic that I'm working with looks like this at the moment:

export const fileImportEpi = (
  action$: any,
  state$: any,
  { apiClient }: IDependencies
) =>
  action$.pipe(
    ofType(FILE_IMPORT),
    withLatestFrom(state$),
    mergeMap(([{ payload }]: any) => { // Input is a 2 element array of objects, where the first object contains the payload key
      const { id, file, config = {} } = payload
      const headers = {
        'Content-Type': 'application/zip',
        Filename: file.name,
      }

      return from(
        apiClient.post(('import', file, {
          ...config,
          headers,
        })
      ).pipe(
        mapTo(fileSuccessAction(id)),
        catchError((error) => {
          return of(
            apiErrorAction(error.response ? error.response.data : error),
            fileAttrsUpdateAction(id, {
              error: error.response ? error.response.data : error,
            }),
            fileFailAction(id)
          )
        }) // Catch
      ) // Pipe
    }) // MergeMap
  )

The issue is that is basically launches all of the POST requests immediately, and this is causing issues in the backend. What I would like to do is introduce a delay between the emission of each post request (let's say 20ms).

I can't seem to make this work though. Wherever I introduce the delay is seems to just launch all of the post requests sooner or later, however never with delay between each request. How could I go about doing this?

(You could argue that the issue should be fixed in the backend however that is not really an option at the moment.)

Edit:

To show another method I have also tried:

export const fileImportEpic = (
  action$: any,
  state$: any,
  { fpiClient }: IDependencies
) =>
  action$.pipe(
    ofType(FILE_IMPORT),
    withLatestFrom(state$),

    mergeMap(([{ payload }]: any) => {
      // mergeMap(([{ payload }]: any) => { // Input looks like: [{payload: obj, otheKeys:...}, {type:val, otherKeys:...}] (Ie. a 2 element array where the first element contains the payload)

      const { id, file, config = {} } = payload
      const headers = {
        'Content-Type': 'application/zip',
        Filename: file.name,
      }
      const pl = [payload]

      // Try to release each part of the payload with a delay in between, instead of returning a post request promise
      return from(pl).pipe(
        delay(3000),
        mergeMap((payload: any) => {
          const { id, file, config = {} } = payload
          const headers = {
            'Content-Type': 'application/zip',
            Filename: file.name,
          }

          return from(
            apiClient.post('archives/import', file, {
              ...config,
              headers,
            })
          ).pipe(
            mapTo(fileSuccessAction(id)),
            catchError((error) => {
              return of(
                apiErrorAction(error.response ? error.response.data : error),
                fileAttrsUpdateAction(id, {
                  error: error.response ? error.response.data : error,
                }),
                fileFailAction(id)
              )
            }) // Catch
          )
        })
      ) // Pipe
    }) // MergeMap 1
  )

Solution

  • change mergeMap to concatMap

    const { of, from, delay, concatMap } = rxjs;
    
    const api = (id) => fetch(`https://jsonplaceholder.typicode.com/todos/${id}`).then((res) => res.json());
    
    of(1, 2, 3, 4, 5).pipe(
      concatMap((id) => from(api(id)).pipe(delay(1000)))
    ).subscribe(console.log);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>