I have a Spring application that makes use of a small Akka actor system (using Java), where I have a MasterActor
that extends Akka's AbstractActor
that initialises a Router
and sets up a few worker actors. It also watches the lifecycle of the workers. I want to restart a Worker actor if it happens to die because of some Exception
.
public MasterActor(ActorPropsFactory actorPropsFactory) {
this.actorPropsFactory = actorPropsFactory;
int workers = Runtime.getRuntime().availableProcessors() - 1;
List<Routee> routees = Stream.generate(this::createActorRefRoutee).limit(workers).collect(Collectors.toList());
this.router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}
private ActorRefRoutee createActorRefRoutee() {
ActorRef worker = getContext().actorOf(actorPropsFactory.create(getWorkerActorClass()));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}
private void route(Object message, Supplier<String> routingKeySupplier) {
String routingKey = routingKeySupplier.get();
RouterEnvelope envelope = new ConsistentHashingRouter.ConsistentHashableEnvelope(message, routingKey);
router.route(envelope, getSender());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
EventMessage.class,
message -> this.route(message, () -> message.getEvent().getId().toString()))
.match(
Terminated.class,
message -> {
logger.info("WorkerActor {} terminated, restarting", message.getActor());
// todo: detect whether the system is shutting down before restarting the actor
router = router.removeRoutee(message.actor())
.addRoutee(createActorRefRoutee());
})
.build();
}
The problem I am having is that if the Spring Application fails to start up. (For example it fails to connect to the database, or some credentials are incorrect or something), I am receiving the Terminated
message from all workers, and the Master actor tries to start new ones, which also get Terminated
immediately, going into an endless loop.
What is the correct way to detect such scenario? Is there a way for the Master actor detect that the actor system is shutting down so that the workers are not restarted again?
Can't you just set up a supervision strategy for your Router so you can inspect the type of Exception that causes the failure? This way you also don't need to restart your workers manually.
EDIT:
You set up SupervisorStrategy
like this:
private static SupervisorStrategy strategy=
new OneForOneStrategy(
10,
Duration.ofMinutes(1),
DeciderBuilder.match(ArithmeticException.class,e->SupervisorStrategy.resume())
.match(NullPointerException.class,e->SupervisorStrategy.restart())
.match(IllegalArgumentException.class,e->SupervisorStrategy.stop())
.matchAny(o->SupervisorStrategy.escalate())
.build());
final ActorRef router=
system.actorOf(
new RoundRobinPool(5).withSupervisorStrategy(strategy).props(Props.create(Echo.class)));
You can read more about it here: