I have a look in which i am calling the following:
// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:
while(true){
myPublishSubject.onNext(someObservable)
}
I would like to stop the emission but have the while loop continue forever. So I want the onNext call to do nothing. But I'm worried that if I call myPublishSubject.onComplete() that eventually the subject will be null and I will get a NPE. Is there anymore just to silence it even if onNext() is repeatedly called. Is the best way just to unsubscribe?
Few notes
This is a pretty much a rare case, but if you can show us your real intention with the Observable
, we might help you out architecting it, if not best, better.
What you can do
For my examples, I have only used a flag variable which is pretty straightforward, this could be changed on whatever trigger you have for your project.
Option 1
You can directly invoke onComplete
on the subject publisher
val maxEmittedItemCount = 10
var currentEmittedItemCount = 0
val someStringValue = "Some observable" // Create whatever observable you have
val publishSubject = PublishSubject.create<String>()
publishSubject.subscribe({
currentEmittedItemCount++
println(it)
}, {
println(it)
})
while (currentEmittedItemCount != maxEmittedItemCount) {
// Print indication that the loop is still running
println("Still looping")
// Publish value on the subject
publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
}
Option 2
You can also hold a reference to the subscription then dispose it afterwards, this is a little bit more semantic than the previous one as it will execute the code block without calling onNext(t)
when the resource is disposed.
lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
override fun onComplete() {
// Print indication of completion for the subject publisher
System.out.println("Complete")
}
override fun onNext(t: String) {
// Test flag count synchonizer
currentEmittedItemCount++
// Print out current emitted item count
System.out.println(currentEmittedItemCount)
// Print current string
System.out.println(t)
}
override fun onError(e: Throwable) {
// Print error
System.out.println(e)
}
})
while (currentEmittedItemCount != maxEmittedItemCount) {
// Publish value on the subject
if (!disposable.isDisposed) publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) {
publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
disposable.dispose()
}
// Print indication that the loop is still running
System.out.println("Still looping")
}
Read more on