I have a specific scenario where I have implemented an PublishSubject to emit items based on a custom Event. For each item that will be emitted I also need to persist that value (an expensive operation).
What I'm trying to achieve is to have a function (like the map
) that will be called once for all the observers, and afterwards the item to be received by each observer through the onNext()
method.
The subject:
static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()
The trigger(emitting items):
commonSubject.onNext(new SomeResult())
Expose the subject(will be used by the controller):
public static Observable<SomeResult> observeResults() {
return commonSubject.share();
}
Controller:
public Observable<SomeResult> observeResults() {
return CustomConsumer.observeResults()
.observeOn(Schedulers.single());
}
Subscribers:
CustomControllerResult.observeResults().subscribe(result -> doSomething());
CustomControllerResult.observeResults().subscribe(result -> doSomethingElse());
Each Observer receives the items as expected, but if I add the expensive operation to the controller, this will be called for each observer(something that I do not want):
public Observable<SomeResult> observeResults() {
return CustomConsumer.observeResults()
.observeOn(Schedulers.single())
.compose(persistResult())
.compose(logResult())
.share();
}
Any ideas on how I can achieve the desired result?
The problem is that every time observeResults()
is called it creates a new Observable with the share
operator. But the Observable created is not being shared with the subscribers.
You can change your code to:
Observable<SomeResult> observable = CustomControllerResult.observeResults()
observable.subscribe(result -> doSomething());
observable.subscribe(result -> doSomethingElse());
Or you can change the observeResults
method to return the shared Observable:
static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()
static final Observable<SomeResult> observable = commonSubject
.observeOn(Schedulers.single())
.compose(persistResult())
.compose(logResult())
.share();
public static Observable<SomeResult> observeResults() {
return observable;
}