I stumbled upon an unexpected cats.effect.std.Mutex
behaviour (it doesn't work for a certain case). Looks like I missed some core understanding of Async
but haven't found the root cause.
Imports:
import cats.{Applicative, FlatMap}
import cats.effect.std.{Console, Mutex}
import cats.effect.*
import cats.implicits.*
import cats.syntax.all.*
import scala.concurrent.duration.*
Suppose we have a service
Working scenario
class Service[F[_] : Async : FlatMap : Console](mutex: Mutex[F]) {
def run(name: String): F[Unit] =
for
_ ← Console[F].println(s"[$name] entered [run] within [${Thread.currentThread().getName}]")
_ ← mutex.lock.surround {
for {
_ ← Console[F].println(s"[$name] entered [locked] within [${Thread.currentThread().getName}]")
_ ← Async[F].sleep(2.seconds)
_ ← Console[F].println(s"[$name] almost left [locked] within [${Thread.currentThread().getName}]")
} yield ()
}
_ ← Console[F].println(s"[$name] left [run] within [${Thread.currentThread().getName}]")
yield ()
}
and a runner
object MutextTests extends IOApp:
def run(args: List[String]): IO[ExitCode] =
(for
mutex ← Mutex[IO]
service ← IO.pure(Service(mutex))
f1 ← service.run("Alice").start
f2 ← service.run("Bob").start
_ ← f1.join
_ ← f2.join
yield ()).as(ExitCode.Success)
with expected output
[Alice] entered [run] within [io-compute-2]
[Bob] entered [run] within [io-compute-2]
[Alice] entered [locked] within [io-compute-blocker-5]
[Alice] almost left [locked] within [io-compute-1]
[Bob] entered [locked] within [io-compute-blocker-3]
[Alice] left [run] within [io-compute-blocker-1]
[Bob] almost left [locked] within [io-compute-2]
[Bob] left [run] within [io-compute-blocker-2]
But if I replace Console[F]
with Async[F]
the mutex stops working:
Not working scenario
class Service[F[_]: Async : FlatMap : Console](mutex: Mutex[F]) {
def run(name: String): F[Unit] =
for
_ ← Async[F].pure(println(s"[$name] entered [run] within [${Thread.currentThread().getName}]"))
_ ← mutex.lock.surround {
for {
_ ← Async[F].pure(println(s"[$name] entered [locked] within [${Thread.currentThread().getName}]"))
_ ← Async[F].sleep(2.seconds)
_ ← Async[F].pure(println(s"[$name] almost left [locked] within [${Thread.currentThread().getName}]"))
} yield ()
}
_ ← Async[F].pure(println(s"[$name] left [run] within [${Thread.currentThread().getName}]"))
yield ()
}
with unexpected output
[Alice] entered [run] within [io-compute-1]
[Bob] entered [run] within [io-compute-1]
[Alice] entered [locked] within [io-compute-5]
[Bob] entered [locked] within [io-compute-4]
[Alice] almost left [locked] within [io-compute-5]
[Alice] left [run] within [io-compute-5]
[Bob] almost left [locked] within [io-compute-6]
[Bob] left [run] within [io-compute-6]
The problem is not the Mutex
, but the use of pure
+ println
.
You are printing before constructing the whole program to be passed to the lock.surround
. Thus, giving the impression that both fibers acquired the Mutex
at the same time; but actually both just tried to acquire it at the same time but only one got access to it.
To get back the expected behaviour you should use delay
to suspend the println
(actually the right thing is blocking
, and the best thing is to use Console
or just IO.println
directly).
You can see that is the case here: https://scastie.scala-lang.org/BalmungSan/szV1P8X5SeiEiYhzItPmRA/1
BTW a couple of notes:
Async
+ FlatMap
is redundant, Async
is already a Monad
. I am actually not sure how that didn't cause you a compile error.cats.implicits.*
is deprecated, all you need is cats.syntax.all.*
start
and join
fiber manually, is error-prone. Always prefer higher level combinators like parTupled
.I have a couple of examples and resources to get familiarized with the whole "programs as values" paradigm here: https://github.com/BalmungSan/programs-as-values hope it can be helpful :D (English recording in the process to be recovered).