scalascalazscalaz-stream

Controlling the throughput of a Process


I'm trying to control the throughput of a Process[F, A] with a timer Process :

val p: Process[List,Int] = Process.iterateEval(0)(i => List(i + 1))

val timer: Process[Task, Duration] = time.awakeEvery(1 second)(Strategy.DefaultStrategy, Strategy.DefaultTimeoutScheduler)

val p2 = p.zipWith(timer)((v, d) => v)

But the compiler says that p2 is a Process[Object, Int]. According to zipWith() signature, it should be a Process[Task, Int].

How can I throttle the output of my Process[F, A] if F is not a Task? I tried with sleepUntil() but I had similar issues.


Solution

  • There is no reason why F won't be anything else than a effect full monad, like an Task. If your code does not need any effects, then you may use type processes like Process0 (Process[Nothing,O]).

    Your code will just work perfectly ok if you change

    val p: Process[Task,Int] = Process.iterateEval(0)(i => Task.now(i + 1)))
    

    or if f passed to iteateEval is pure, just simply

    val p : Process0[Int] = Process.iterate(0)(i => i + 1)