rxjspipesubject

Why does operator in piped Subject is called for every subscription?


I need to filter values from Subject and do some side-effects on returned data.
Something like this:

const subject2 = subject.pipe(
  filter((value: number) => {
    console.log(`filter: ${value}`);
    return value % 2 === 0; // filter even nubmers
  }),
  tap((value) => console.log(`after filter: ${value}`))
);

I see that function from filter() is called for every value emitted to subject2 subscribers (i.e. as many times as subject2 subscribers length). But I assumed that it will be called once for every next() call.

Also I see that if I subscribe to subject2 and pipe its values, no duplication appears.

Could someone please explain what's going on behind the scene and what is the correct pattern of filtering subject values?

Example on Stackblitz:
https://stackblitz.com/edit/typescript-e4stc4?devtoolsheight=100&file=index.ts


Solution

  • Behind the scenes, the Subject's next method is implemented like this:

    for (const observer of this.observers) {
      observer.next(value);
    }
    

    So each "observer" (or "subscriber") will get its own notification when you emit into the Subject. The operators are just functions that process the value before the result is passed to the observer.

    For example if you declared the operators like this:

    const myFilter = filter((value: number) => value % 2 === 0);
    const myTap = tap((value) => console.log(`after filter: ${value}`));
    

    Then the next function inside a custom Subject could be implemented like this:

    for (const observer of this.observers) {
      observer.next(myTap(myFilter(value)));
    }
    

    (This code wouldn't actually work - it's a simplification to show how the values reach the subscriber when you call next on a Subject)

    To solve your issue, you can reduce the number of observers to the source Subject by putting a share() as the last element of the chain like so:

    const subject2 = subject.pipe(
      filter((value: number) => {
        console.log(`filter: ${value}`);
        return value % 2 === 0; // filter even nubmers
      }),
      tap((value) => console.log(`after filter: ${value}`)),
      share()
    );
    

    share is implemented such that it acts as a single observer to the source Observable no matter how many observers are subscribed to it.