I am trying to implement a Java Aggregator for Akka, since it doesn't look like the Java API supports them (why not!?)
Here's my best attempt thus far:
// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
ActorRef recipient
Set<T> aggregation
// TODO: Timer timer (?)
abstract boolean isAggregated()
@Override
void onReceive(Object message) {
aggregation << message as T
if(isAggregated()) {
recipient.tell(new Aggregation(aggregation)) // again, pseudo-code
aggregation.clear()
// TODO: timer.reset()
}
}
}
What is missing is some kind of a Timer
construct that will time the Aggregator
out after, say, 60 seconds if it has not aggregated yet. On timeout, it should throw some exception of some sort. On aggregation, the timer should be reset. Any ideas how to do this?