We wish to accomplish the following on akka stream shutdown
We have the following code for shutting down our actor system (Akka 2.4).
try {
Await.ready(actorSystem.terminate(), sleepSeconds.seconds)
} catch {
case ex: Throwable => log.error("Failed to terminate actor system", ex)
}
I am trying to understand the akka documentation and its not clear to me if we are required to shutdown the materializer (for our akka stream) separately or not? We have a method on the materializer and one on the actor system here.
Or in other words, what is the idiomatic step of operations we should be taking to cleanly/gracefully shutdown our akka stream .
You have to shutdown the stream before actor system termination. If you don't, you may lose some elements in internal buffers that may not get to the final Sink
Consider following example with akka 2.6 (if some APIs are different from 2.4, please adjust yourself)
implicit val actorSystem: ActorSystem = ActorSystem("test")
val source = Source(LazyList.fill(100_000)(Random.nextInt(100)))
val flow = Flow[Int]
.throttle(3, 1.second)
.map { s =>
println(s);
s
}
.grouped(4)
.map(s => (s.size, s.max))
val stream = source.via(flow).to(Sink.foreach(println))
val _ = stream.run()
Thread.sleep(5_400)
val _ = Await.result(actorSystem.terminate(), 2.seconds)
println("Terminated")
when it runs, you may see final output before actor system termination as following
36
84
38
32
(4,84)
29
Terminated
it produces element 29
that is stuck in grouped
and never reaches final Sink.foreach(println)
.
This is possible to overcome with the help of a kill switch
implicit val actorSystem: ActorSystem = ActorSystem("test")
val killSwitch = KillSwitches.single[Int]
val source = Source(LazyList.fill(100_000)(Random.nextInt(100))).viaMat(killSwitch)(Keep.right)
val flow = Flow[Int]
.throttle(3, 1.second)
.map { s =>
println(s);
s
}
.grouped(4)
.map(s => (s.size, s.max))
val stream = source.viaMat(flow)(Keep.left).toMat(Sink.foreach(println))(Keep.both)
val (kill, done) = stream.run()
Thread.sleep(5_400)
kill.shutdown()
val _ = Await.result(done, 2.seconds)
println("Terminated")
val _ = Await.result(actorSystem.terminate(), 2.seconds)
Now the last output will be correct
10
64
77
6
(4,77)
74
24
(2,74)
Terminated
and output (2,74)
proves that grouped
was flushed.
Please note usage of viaMat
and toMat
to capture two materialized values. (You can read about materialized values here)
killSwitch
returned by KillSwitches.single[Int]
Future[Done]
returned by Sink.foreach
KillSwitch
can only initiate a termination of a stream but you still have to wait for the stream completion. This can be done via Future[Done]
from the Sink
kill.shutdown()
val _ = Await.result(done, 2.seconds)
This initiates a shutdown and then waits for it to complete.
Answering question in the comments, the kill switch needs to be integrated as close to the source as possible. The kill switch has to integrated before all of the side effects, in my example these are map
and sink
because of println
. In real world, side effect can be storing a value in a database or sending a message to external queue/topic.
Kill switch emits a source completion signal and this signal must pass downstream to initiation termination logic for each flow and sink.