javascripttypescriptrxjstop-level-awaitreplaysubject

Deep diving into Rx.ReplaySubject: How to delay `next()`?


Ignoring Block: Either I'm wrong here, or the whiskey is starting to work. (I don't want to rule out that I'm going stupid either. Sorry for that.)

Expectation:

I would have expected ReplaySubject to return a single value every 2 seconds because I wait two seconds (each time) before I call next().

Result:

The result is that there is a wait of 2 seconds, but then all values are output simultaneously.

The code in question is here:

import { ReplaySubject } from 'rxjs';

export const rs$: ReplaySubject<number> = new ReplaySubject();

rs$.subscribe({
  next: (data) => console.log(data),
  error: (error) => console.warn(error),
  complete: () => console.log('ReplaySubject completed'),
});

const fakeAPIValuesOne: Array<number> = [7, 11, 13];

fakeAPIValuesOne.forEach(async (entry: number) => {
  await wait(2000);  // <--- Why does wait() not work?
  rs$.next(entry);
});

function wait(milliseconds: number) {
  return new Promise((resolve) => setTimeout(resolve, milliseconds));
}

Question:

What am I doing fundamentally wrong here?

If you want to try it out: https://stackblitz.com/edit/rxjs-wpnqvq?file=index.ts

EDIT 1:

SetTimeout also has no effect. The following code does exactly the same as above:

fakeAPIValuesOne.forEach((value: number) => {
  setTimeout(() => {
    rs$.next(value);
  }, 2000);
});

I wonder how next() can override all the delays here?

EDIT 2

The problem is solved, correct answer marked, thank you! You need the following details to run root level awaits for your ts-files.

package.json

Please notice the type section:

{
  "name": "playground",
  "version": "1.0.0",
  "description": "",
  "main": "index.ts",
  "scripts": {
    "start": "nodemon index.ts",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "MIT",
  "dependencies": {
    "rxjs": "^7.5.5",
    "ts-node": "^10.7.0",
    "typescript": "^4.8.0-dev.20220507"
  },
  "type": "module"
}

nodemon.json

Please consider the following config to avoid the error: TypeError [ERR_UNKNOWN_FILE_EXTENSION]: Unknown file extension ".ts"

{
  "execMap": {
    "ts": "node --loader ts-node/esm"
  }
}

Last but not least tsconfig.json

{
  "compilerOptions": {
    "module": "ESNext",
    "target": "ESNext",
    "moduleResolution": "node",
    "esModuleInterop": true,
    "allowSyntheticDefaultImports": true,
    "isolatedModules": true,
    "noEmit": true,
    "strict": true,
    "lib": ["esnext", "DOM"]
  }
}

Solution

  • Following note is extracted from mozilla web docs here

    Note: forEach expects a synchronous function.

    forEach does not wait for promises. Make sure you are aware of the implications while using promises (or async functions) as forEach callback.

    So you this issue isn't related to the ReplaySubject, you just cannot use forEach for this use-case.

    Cheers

    EDIT: Solved

    import { ReplaySubject } from "rxjs";
    
    export const rs$ = new ReplaySubject();
    
    rs$.subscribe({
      next: (data) => console.log(data),
      error: (error) => console.warn(error),
      complete: () => console.log("ReplaySubject completed"),
    });
    
    const fakeAPIValuesOne = [7, 11, 13];
    
    // That won't work:
    // fakeAPIValuesOne.forEach(async (entry: number) => {
    //   await wait(2000);
    //   rs$.next(entry);
    // });
    
    
    // That will work
    for (const element of fakeAPIValuesOne) {
      await wait(2000);
      rs$.next(element);
    }
    
    function wait(milliseconds: number) {
      return new Promise((resolve) => setTimeout(resolve, milliseconds));
    }