I am trying to make a wrapper on the following function in Akka streams.
RestartFlow.withBackoff(minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2) {
() => s
}
where s is some source that I am wrapping with the back off. Ideally id like something like this
RetryFlow(s)
Ive managed to create this:
object RetryFlow {
def apply[In, Out, _, T <: Flow[In, Out, _]](source: T, minBackoff: FiniteDuration = 3.seconds, maxBackoff: FiniteDuration = 30.seconds, randomFactor: Double = 0.2): Flow[In, Out, NotUsed] = {
RestartFlow.withBackoff(
minBackoff = minBackoff,
maxBackoff = maxBackoff,
randomFactor = randomFactor) {
() => source
}
}
}
The issue is I need to supply all 3 type params of the flow again at the call site and it looks horrendous
RetryFlow[JustDataEvent, JustDataEvent, NotUsed, Flow[JustDataEvent, JustDataEvent, NotUsed]](s)
It's also not type-safe as I could type any type params here.
What I feel like should be possible but I'm not sure how is to not take the extra type params In and Out but do something like T#In, T#Out etc as I've said that T extends flow, therefore, T Already has the type parameters I need.
How about instead of T
, just taking a parameter of type Flow[In, Out, _]
?
object RetryFlow {
def apply[In, Out, _](
source: Flow[In, Out, _],
minBackoff: FiniteDuration = 3.seconds,
maxBackoff: FiniteDuration = 30.seconds,
randomFactor: Double = 0.2): Flow[In, Out, NotUsed] = {
RestartFlow.withBackoff[In, Out](minBackoff = minBackoff,
maxBackoff = maxBackoff,
randomFactor = randomFactor) { () =>
source
}
}
}
And then we get:
val value: Flow[Int, String, NotUsed] = Flow.fromFunction[Int, String](i => i.toString)
RetryFlow(value)