javakotlinrx-java2rx-kotlin2

How to call a method multiple times using RxJava's method retryWhen?


I have a method which perform network request and return Observable with mutable list of data class. Sometimes this method fails with 403 error. I need call YouTubeClient.getApiKey() method for getting new Api key and repeat request to network. How to do it? I read a lot of similar topics but didn't find working decision.

This code of utility method, when i try to call retryWhen() method

private fun searchRequestWrapper(query: String): Observable<MutableList<Video>> {
    return youTubeClient.searchRequest(
        YouTubeClient.URL_SNIPPET,
        YouTubeClient.MAX_RESULT, query,
        YouTubeClient.API_KEY
    )
        .retryWhen { errors -> errors
            .zipWith(Observable.range(1, 3)) { error, a ->
                YouTubeClient.getApiKey()
                error
            }
        }
        .map {it.items}
}

Main method which call utility method inside itself below

fun fetchVideos(query:String) {
    _networkState.set(NetworkState.LOADING)
    Log.e("NetworkState", networkState.get()?.status.toString())

    try {
        compositeDisposable.add(
            searchRequestWrapper(query)
            .flatMapIterable {it}
            .flatMap { video -> videoInfoWrapper(video.videoId).subscribeOn(Schedulers.io()) }
            .toList()
            .subscribeOn(Schedulers.io())
            .subscribe({
                Log.e("new videosId",it.toString())
                downloadedVideosList.postValue(it)
                _networkState.set(NetworkState.LOADED)
                Log.e("NetworkState", networkState.get()?.status.toString())
                _networkState.set(NetworkState.WAITING)
                Log.e("NetworkState", networkState.get()?.status.toString())
            },{
                errorHandle(it)
            }))
    }
    catch (e: Exception){
        Log.e("fetchVideos",e.message)
    }
}

Solution

  • What about this solution?

    In fact you are using retryWhen wrong. The retryWhen operator just signals whether a retry should happen or not. Each time the inner observable emits a value, a retry is started.

    In my example a 403 exception is caught via onErrorResumNext, which catches onError emits and subscribes to a fallback-observable. The retryWhen after this operator makes sure that other errors will be retried 3 times.

    FYI: each operator transforming date should only work in immutable data, therefore MutableList should be avoided.

    class So65040953 {
        @Test
        fun main() {
            val client = YouTubeClientInterceptor(YouTubeClientStub())
            val scheduler = TestScheduler()
    
            val test = client.searchRequest("fail")
                    // when 403 -> request key and call search again...
                    // else propagate error
                    .onErrorResumeNext {
                        when (it.message) {
                            "403" -> client.requestApiKey()
                                    .flatMapObservable { key -> client.searchRequest("good") }
                            else -> Observable.error(it)
                        }
                    }
                    // when some error other than 403 happens... retry ever second for three times....
                    // error could be thrown by #searchRequest or #requestApiKey
                    .retryWhen { throwableObservable ->
                        throwableObservable.take(3).delay(1L, TimeUnit.SECONDS, scheduler)
                    }.test()
    
            scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
    
            assertThat(client.requestApiKey.get()).isEqualTo(1)
            assertThat(client.searchRequestCount.get()).isEqualTo(2)
            test.assertNoErrors()
                    .assertValue(listOf(Video("42")))
        }
    }
    
    internal interface YouTubeClient {
        fun searchRequest(query: String): Observable<List<Video>>
    
        fun requestApiKey(): Single<String>
    }
    
    internal class YouTubeClientInterceptor(private val client: YouTubeClient) : YouTubeClient {
        internal val searchRequestCount = AtomicInteger(0)
        internal val requestApiKey = AtomicInteger(0)
    
        override fun searchRequest(query: String): Observable<List<Video>> {
            searchRequestCount.incrementAndGet()
            return client.searchRequest(query)
        }
    
        override fun requestApiKey(): Single<String> {
            requestApiKey.incrementAndGet()
            return client.requestApiKey()
        }
    }
    
    internal class YouTubeClientStub : YouTubeClient {
        override fun searchRequest(query: String): Observable<List<Video>> {
            println("query: $query")
    
            return when (query) {
                "fail" -> Observable.error<List<Video>>(RuntimeException("403"))
                "good" -> Observable.just(listOf(Video("42")))
                else -> Observable.empty()
            }
        }
    
        override fun requestApiKey(): Single<String> {
            println("requestApiKey")
    
            return Single.just("1-1-1-1")
        }
    }
    
    internal data class Video(private val videoId: String)