androidrx-java2publishsubject

PublishSubject calls expensive function for all Observers


I have a specific scenario where I have implemented an PublishSubject to emit items based on a custom Event. For each item that will be emitted I also need to persist that value (an expensive operation). What I'm trying to achieve is to have a function (like the map) that will be called once for all the observers, and afterwards the item to be received by each observer through the onNext() method.

The subject:

static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()

The trigger(emitting items):

commonSubject.onNext(new SomeResult())

Expose the subject(will be used by the controller):

public static Observable<SomeResult> observeResults() {
    return commonSubject.share();
}

Controller:

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single());
}   

Subscribers:

CustomControllerResult.observeResults().subscribe(result -> doSomething());
CustomControllerResult.observeResults().subscribe(result -> doSomethingElse());

Each Observer receives the items as expected, but if I add the expensive operation to the controller, this will be called for each observer(something that I do not want):

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single())
            .compose(persistResult())
            .compose(logResult())
            .share();
}

Any ideas on how I can achieve the desired result?


Solution

  • The problem is that every time observeResults() is called it creates a new Observable with the share operator. But the Observable created is not being shared with the subscribers.

    You can change your code to:

    Observable<SomeResult> observable = CustomControllerResult.observeResults()
    observable.subscribe(result -> doSomething());
    observable.subscribe(result -> doSomethingElse());
    

    Or you can change the observeResults method to return the shared Observable:

    static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()
    
    static final Observable<SomeResult> observable = commonSubject
        .observeOn(Schedulers.single())
        .compose(persistResult())
        .compose(logResult())
        .share();
    
    public static Observable<SomeResult> observeResults() {
        return observable;
    }