rx-javarx-java2publishsubject

RxJava - how to stop PublishSubject from publishing even if onNext() is called


I have a look in which i am calling the following:

// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:    
while(true){
   myPublishSubject.onNext(someObservable)
}

I would like to stop the emission but have the while loop continue forever. So I want the onNext call to do nothing. But I'm worried that if I call myPublishSubject.onComplete() that eventually the subject will be null and I will get a NPE. Is there anymore just to silence it even if onNext() is repeatedly called. Is the best way just to unsubscribe?


Solution

  • Few notes

    This is a pretty much a rare case, but if you can show us your real intention with the Observable, we might help you out architecting it, if not best, better.

    What you can do

    For my examples, I have only used a flag variable which is pretty straightforward, this could be changed on whatever trigger you have for your project.

    Option 1

    You can directly invoke onComplete on the subject publisher

    val maxEmittedItemCount = 10
    var currentEmittedItemCount = 0
    val someStringValue = "Some observable" // Create whatever observable you have
    val publishSubject = PublishSubject.create<String>()
    
    publishSubject.subscribe({
        currentEmittedItemCount++
        println(it)
    }, {
        println(it)
    })
    
    while (currentEmittedItemCount != maxEmittedItemCount) {
        // Print indication that the loop is still running
        println("Still looping")
    
        // Publish value on the subject
        publishSubject.onNext(someStringValue)
    
        // Test flag for trigger
        if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
    }
    

    Option 2

    You can also hold a reference to the subscription then dispose it afterwards, this is a little bit more semantic than the previous one as it will execute the code block without calling onNext(t) when the resource is disposed.

    lateinit var disposable: Disposable // Will hold reference to the subscription
    var maxEmittedItemCount = 10
    var currentEmittedItemCount = 0
    var someStringValue = "Some observable" // Create whatever observable you have
    var publishSubject = PublishSubject.create<String>()
    
    disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
        override fun onComplete() {
            // Print indication of completion for the subject publisher
            System.out.println("Complete")
        }
    
        override fun onNext(t: String) {
            // Test flag count synchonizer
            currentEmittedItemCount++
    
            // Print out current emitted item count
            System.out.println(currentEmittedItemCount)
    
            // Print current string
            System.out.println(t)
        }
    
        override fun onError(e: Throwable) {
            // Print error
            System.out.println(e)
        }
    })
    
    while (currentEmittedItemCount != maxEmittedItemCount) {
        // Publish value on the subject
        if (!disposable.isDisposed) publishSubject.onNext(someStringValue)
    
        // Test flag for trigger
        if (currentEmittedItemCount == maxEmittedItemCount) {
            publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
            disposable.dispose()
        }
    
        // Print indication that the loop is still running
        System.out.println("Still looping")
    }
    

    Read more on