scalagenericsakka-streamscala-generics

Access type parameters of generic parameter scala


I am trying to make a wrapper on the following function in Akka streams.

RestartFlow.withBackoff(minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2) {
      () => s
    }

where s is some source that I am wrapping with the back off. Ideally id like something like this

RetryFlow(s)

Ive managed to create this:

object RetryFlow {
  def apply[In, Out, _, T <: Flow[In, Out, _]](source: T, minBackoff: FiniteDuration = 3.seconds, maxBackoff: FiniteDuration = 30.seconds, randomFactor: Double = 0.2): Flow[In, Out, NotUsed] = {
    RestartFlow.withBackoff(
      minBackoff = minBackoff,
      maxBackoff = maxBackoff,
      randomFactor = randomFactor) {
      () => source
    }
  }
}

The issue is I need to supply all 3 type params of the flow again at the call site and it looks horrendous

RetryFlow[JustDataEvent, JustDataEvent, NotUsed, Flow[JustDataEvent, JustDataEvent, NotUsed]](s)

It's also not type-safe as I could type any type params here.

What I feel like should be possible but I'm not sure how is to not take the extra type params In and Out but do something like T#In, T#Out etc as I've said that T extends flow, therefore, T Already has the type parameters I need.


Solution

  • How about instead of T, just taking a parameter of type Flow[In, Out, _]?

    object RetryFlow {
      def apply[In, Out, _](
          source: Flow[In, Out, _],
          minBackoff: FiniteDuration = 3.seconds,
          maxBackoff: FiniteDuration = 30.seconds,
          randomFactor: Double = 0.2): Flow[In, Out, NotUsed] = {
        RestartFlow.withBackoff[In, Out](minBackoff = minBackoff,
                                maxBackoff = maxBackoff,
                                randomFactor = randomFactor) { () =>
          source
        }
      }
    }
    

    And then we get:

    val value: Flow[Int, String, NotUsed] = Flow.fromFunction[Int, String](i => i.toString)
    RetryFlow(value)