javaakkaaggregationintegration-patterns

Java Aggregator for Akka


I am trying to implement a Java Aggregator for Akka, since it doesn't look like the Java API supports them (why not!?)

Here's my best attempt thus far:

// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
    ActorRef recipient
    Set<T> aggregation
    // TODO: Timer timer (?)

    abstract boolean isAggregated()

    @Override
    void onReceive(Object message) {
        aggregation << message as T

        if(isAggregated()) {
            recipient.tell(new Aggregation(aggregation))    // again, pseudo-code
            aggregation.clear()
            // TODO: timer.reset()
        }
    }
}

What is missing is some kind of a Timer construct that will time the Aggregator out after, say, 60 seconds if it has not aggregated yet. On timeout, it should throw some exception of some sort. On aggregation, the timer should be reset. Any ideas how to do this?


Solution

  • What you are looking for is ReceiveTimeout. Akka provides a feature to have a timeout when a particular actor has not received anything in a predefined amount of time.

    In Java you would do something like this inside your actor:

    getContext().setReceiveTimeout(Duration.create("1 second"));
    

    When this trigger it sends a message of type ReceiveTimeout to the actor and then you can decide what you want to do (exceptions, logging, resetting...).

    You can find more information here under the section 'Receive timeout': http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html

    On the other hand, there are open-source libraries to do these kind of things available in github. Take a look to https://github.com/sksamuel/akka-patterns for more examples.