javarx-java2publishsubject

RXJava2 manage subscriptions


I need to clarify about what is the best approach for managing the specific scenario using RxJava2 (the whole app structure is based on it):

In my app, a lot of people can make changes in the same document, so I need to deliver every change to everyone looking at the document. But this object is very complex and heavy, so I need to remove it from memory when the last person closes it. And more: The document can be a child from another document, so every change in the parent document must be sent to all children.

What I have done so far: I created a manager so every document request comes to it. Whenever someone needs to work on the document, I watch in a Map if the document is already open. If it is not, I create a BaseDocument instance, that receives the data from the document and a PublishSubject to distribute the events and add to that map. Then I subscribe the user`s Observer on the PublishSubject to get the changes. Whenever the user needs to change something, it sends the change to the BaseDocument, it makes the change and sends the new version through onNext() to everybody. So far so good.

My problem is that I can't control when someone disposes the document observer, so I can't control when the document is not needed anymore so I can persist any unsaved changes and destroy the object. I can't find any subscribe list or anything like it except for "hasObservers()" and I don't want to add a timer to poll it to close if all is done.

My "miraculous answer" would be a callback that is called when the last subscriber disposes, so I could just clean the house and throw the whole object away, but I can't find anything like this. So, how can I manage the subscriptions?


Solution

  • One of the approaches would be to count the number of subscribes and disposes. And if the number hits 0, remove the document. It would look something like this:

    int numberOfSubscribers = 0;
    
    ...
    
    public Observable<T> expose(){
        return subject.asObservable()
            .doOnSubscribe(()-> numberOfSubscribers++)
            .doOnDispose(()-> {
                numberOfSubscribers--;
                if (numberOfSubscribers == 0){
                  //remove the object
                 }
            });
    

    Of course you need to add support for the concurrency problem here (synchronized/atomic int), this is just a draft.

    Hope this helps :)