kotlinrx-java2rx-androidandroid-jetpack-composerx-kotlin

How to doOnNext only consumes the first n items of a PublishSubject?


I have a PublishSubject:

val myPublishSubject = remember {
    PublishSubject.create<Long>().apply {
        doOnNext {
            Logger.debug(TAG) { "Got new Long $it" }
        }
            .takeUntil(someObservable)
            .subscribe()
    }
}

and I have a service what gets a stream of Long values, somewhere in another function:

while(notFinished){
    val newLong = getSomeLong()
    myPublishSubject.onNext(newLong)
}

Here, the upper doOnNext{} is logging forever. How can I for instance only allow first 20 Long values? .take(20) did not work!


Solution

  • take(20) should work, this test passes:

    @Test
    fun consumeOnlyFirst20ValueTest() {
        val ps = PublishSubject.create<Long>()
    
        CoroutineScope(Dispatchers.Default).launch {
            for (i in 1L..1000L) {
                ps.onNext(i)
                delay(50)
            }
        }
    
        var i = 0
        ps.take(20)
                .doOnNext {
                    i++
                }.subscribeOn(Schedulers.io())
                .subscribe()
    
        Thread.sleep(6000)
        assertEquals(20, i)
    }