I need to filter values from Subject and do some side-effects on returned data.
Something like this:
const subject2 = subject.pipe(
filter((value: number) => {
console.log(`filter: ${value}`);
return value % 2 === 0; // filter even nubmers
}),
tap((value) => console.log(`after filter: ${value}`))
);
I see that function from filter()
is called for every value emitted to subject2
subscribers (i.e. as many times as subject2
subscribers length). But I assumed that it will be called once for every next()
call.
Also I see that if I subscribe to subject2
and pipe its values, no duplication appears.
Could someone please explain what's going on behind the scene and what is the correct pattern of filtering subject values?
Example on Stackblitz:
https://stackblitz.com/edit/typescript-e4stc4?devtoolsheight=100&file=index.ts
Behind the scenes, the Subject's next
method is implemented like this:
for (const observer of this.observers) {
observer.next(value);
}
So each "observer" (or "subscriber") will get its own notification when you emit into the Subject. The operators are just functions that process the value
before the result is passed to the observer.
For example if you declared the operators like this:
const myFilter = filter((value: number) => value % 2 === 0);
const myTap = tap((value) => console.log(`after filter: ${value}`));
Then the next
function inside a custom Subject could be implemented like this:
for (const observer of this.observers) {
observer.next(myTap(myFilter(value)));
}
(This code wouldn't actually work - it's a simplification to show how the values reach the subscriber when you call next
on a Subject)
To solve your issue, you can reduce the number of observers to the source Subject by putting a share()
as the last element of the chain like so:
const subject2 = subject.pipe(
filter((value: number) => {
console.log(`filter: ${value}`);
return value % 2 === 0; // filter even nubmers
}),
tap((value) => console.log(`after filter: ${value}`)),
share()
);
share
is implemented such that it acts as a single observer to the source Observable no matter how many observers are subscribed to it.