kotlin-coroutineskotlin-flowresilience4jresilience4j-retry

Kotlin Flow<T> with Resilience4j RateLimiter and Retry


I have Resilience4j version: 1.7.1, Kotlin version: 1.7.0, Kotlin Coroutines: 1.6.1.

I'd like to use RateLimiter and Retry in kotlin code, but documentations don't contain information how to use Kotlin Flow with them.

I have a simple code:

suspend main() {
    val rateLimiterConfig = RateLimiterConfig.custom()
            .limitForPeriod(2)
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .timeoutDuration(Duration.ofSeconds(2))
            .build()
        val rateLimiter = RateLimiter.of("rate-limiter", rateLimiterConfig)
    
        val retryConfig = RetryConfig.custom<Any>()
            .maxAttempts(3)
            .retryExceptions(Exception::class.java)
            .build()
        val retry = Retry.of("retry", retryConfig)
    
        coroutineScope {
           flowOf(1,2,3,4,5,6,7,8)
                .rateLimiter(rateLimiter)
                .retry(retry)
                .map { async { process(it) } }
                .toList().awaitAll()
        }
}

suspend fun process(num: Int): Int {
    println("time: ${getTime()}, value: $num")
    if(num >= 8) throw Exception()
    delay(1000)
    return num * num
}

And I don't have any limiting or retry. If run this code with printing time(mm:ss.SSS) and incoming value, I have this:

time: 46:26.488,value: 7
time: 46:26.488,value: 4
time: 46:26.488,value: 3
time: 46:26.488,value: 1
time: 46:26.488,value: 6
time: 46:26.488,value: 5
time: 46:26.488,value: 8
time: 46:26.488,value: 2
Exception in thread "main" java.lang.Exception
    at MainKt.process(Main.kt:165)
    at MainKt$main$2$1$1.invokeSuspend(Main.kt:142)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)

How does it work?


Solution

  • I think this is what you want:

    coroutineScope {
        flowOf(1,2,3,4,5,6,7,8)
            .rateLimiter(rateLimiter)
            .map { process(it) }
            .retry(retry)
            .toList()
    }
    

    1. Retries

    retry from Resilience4j uses Flow.retryWhen under the hood. To make it work you have to use it after .map invocation. Also, .retry operator will retry the whole flow, not only the failed operation.

    kotlinx.coroutines docs:

    Retries collection of the given flow when an exception occurs in the upstream flow and the predicate returns true.

    This operator is transparent to exceptions that occur in downstream flow and does not retry on exceptions that are thrown to cancel the flow.

    2. Rate limiting

    Using async { } and then .awaitAll kinda parallelizes the whole process, so rateLimiter won't be able to do its job. Just do .map { process(it) }.