I'm trying to create a stream/observable that...
The concrete case is that I need an observable that makes an Async API call whenever a particular event happens, but only if it has subscribers. I'm trying to avoid unnecessary API calls.
I've managed to create a stream that only fires when it has subscribers like this...
let dataStream = Rx.Observable
.interval(1000) // Fire an event every second
.singleInstance() // Only do something when we have subscribers
.startWith(null) // kick start as soon as something subscribes
.flatMapLatest(interval => SomeAPI.someDataGet()) // get data, returns a promise
And this works. If I console.log(...)
in the SomeAPI.someDataGet
method, I only see it firing when the stream has subscribers. And my implementation looks really nice because I do this to subscribe and unsubscribe which fits in very nicely with React component lifecycle methods.
let sub1;
sub1 = dataStream.subscribe(x => console.log('sub1', x));
sub1.dispose();
I also want any new subscribers to receive the latest value the instant they subscribe. This is where I'm struggling. If I do this...
let sub1, sub2;
sub1 = dataStream.subscribe(x => console.log('sub1', x));
setTimeout( () => {
sub2 = dataStream.subscribe(x => console.log('sub2', x));
}, 1500)
...I don't see the console.log
for sub2
until the next interval.
If my understanding is correct. I need a Hot Observable. So I have tried to create a stream like this...
let dataStream = Rx.Observable
.interval(1000) // Fire an event every second
.singleInstance() // Only do something when we have subscribers
.startWith(null) // kick start as soon as something subscribes
.flatMapLatest(interval => SomeAPI.someDataGet()) // get data
.publish() // Make this a hot observable;
Which as I understand it, should make dataStream
a hot observable
.
However, in my tests the second subscription still doesn't receive data until the next interval. In addition, this would introduce the requirement to connect and disconnect the dataStream when subscribing which is something I would like to avoid if possible.
I'm brand new to RxJS and I would not be surprised if I've misunderstood what's happening here.
Instead of .publish()
, use .shareReplay(1)
.