I have Resilience4j version: 1.7.1, Kotlin version: 1.7.0, Kotlin Coroutines: 1.6.1.
I'd like to use RateLimiter and Retry in kotlin code, but documentations don't contain information how to use Kotlin Flow with them.
I have a simple code:
suspend main() {
val rateLimiterConfig = RateLimiterConfig.custom()
.limitForPeriod(2)
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofSeconds(2))
.build()
val rateLimiter = RateLimiter.of("rate-limiter", rateLimiterConfig)
val retryConfig = RetryConfig.custom<Any>()
.maxAttempts(3)
.retryExceptions(Exception::class.java)
.build()
val retry = Retry.of("retry", retryConfig)
coroutineScope {
flowOf(1,2,3,4,5,6,7,8)
.rateLimiter(rateLimiter)
.retry(retry)
.map { async { process(it) } }
.toList().awaitAll()
}
}
suspend fun process(num: Int): Int {
println("time: ${getTime()}, value: $num")
if(num >= 8) throw Exception()
delay(1000)
return num * num
}
And I don't have any limiting or retry. If run this code with printing time(mm:ss.SSS) and incoming value, I have this:
time: 46:26.488,value: 7
time: 46:26.488,value: 4
time: 46:26.488,value: 3
time: 46:26.488,value: 1
time: 46:26.488,value: 6
time: 46:26.488,value: 5
time: 46:26.488,value: 8
time: 46:26.488,value: 2
Exception in thread "main" java.lang.Exception
at MainKt.process(Main.kt:165)
at MainKt$main$2$1$1.invokeSuspend(Main.kt:142)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
How does it work?
I think this is what you want:
coroutineScope {
flowOf(1,2,3,4,5,6,7,8)
.rateLimiter(rateLimiter)
.map { process(it) }
.retry(retry)
.toList()
}
retry
from Resilience4j uses Flow.retryWhen
under the hood. To make it work you have to use it after .map
invocation. Also, .retry
operator will retry the whole flow, not only the failed operation.
kotlinx.coroutines
docs:
Retries collection of the given flow when an exception occurs in the upstream flow and the predicate returns true.
This operator is transparent to exceptions that occur in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
Using async { }
and then .awaitAll
kinda parallelizes the whole process, so rateLimiter
won't be able to do its job. Just do .map { process(it) }
.