2

I have implemented an application in Akka framework using Java. I have a main actor which calls sub-actor by using 'Ask' method and timeout after 60 seconds, the worker calls another java class method once it receives the message from Main Actor.

Now the problem is though my Main actor timed-out after 60 seconds still the worker is able to talk to the java class method and in-turn the method is performing the operations which is not required as the main actor cannot receive the response though the sub-actor returns that due to the timeout.

Is there anyway I can kill the worker or stop it from further processing if my Main actor timeout? I checked the methods like RecieveTimeOut(), context.stop() and poisonpill but still no use.

Appreciate your support

Code Below

public class MainActor extends UntypedActor {

    ActorRef subActorRef;

    final Timeout timeout = new Timeout(Duration.create(60, TimeUnit.SECONDS));

    @Override
    public void preStart() {
        subActorRef = getContext().actorOf(
            SpringExtProvider.get(actorSystem).props(
                "SubActor"), "subActor");
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof BusinessRequestVO) {
            BusinessRequestVO requestVo = (BusinessRequestVO) message;
            ArrayList<Future<Object>> responseFutures = new ArrayList<Future<Object>>();

            // This part of code timeout after 60seconds
            responseFutures.add(ask(subActorRef,requestVo, timeout));
        }
    }
}

SubActor class

public class SubActor extends UntypedActor {

    @Resource
    @Inject
    ServiceAdapter serviceAdapter;

    @Override
    public void onReceive(Object message) throws Exception {
        try{
            if (message instanceof BusinessRequestVO) {
                BusinessRequestVO requestVo = (BusinessRequestVO)message

                // There is no time out here so it waits synchronously
                // though Main actor timeouts
                ServiceResponse response = serviceAdapter.getWorkOrder(requestVo);

                getSender().tell(response, ActorRef.noSender());
            } catch (Exception e) {
                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
                throw e;
            }
        }
    }
}

Adapter Class

public class ServiceAdapterImpl implements ServiceAdapter{
    public ServiceResponse getWorkOrder(BusinessRequestVO request){
        // Some code here along with webservice calls
    }
}
Chris Martin
  • 30,334
  • 10
  • 78
  • 137
Rajesh B
  • 41
  • 1
  • 3

1 Answers1

0

You can't as your child actor is blocking, thus cannot process any "stop" messages that the parent sends him (actors process messages one at a time before reading the next one in the mailbox).

Your best bet is to wrap the "slow" part of the child's execution inside a future that you can pipeTo the parent (see here for details).

In this way, if your timeout expires, you can have the parent send a custom "stop computing" message, and the child actor can terminate the future. See here about how to terminate a future.

But this could introduce "dirty" states in your application logic according to the specific computation that gets terminated midway through execution.

On an related note: why are you sending all n requests to the same child actor (which you made blocking)? This is equivalent to sequential execution. You should either make the child actor non-blocking or (better) create a blocking actor for each request.

EDIT: as requested by OP, added snippet. It's a pseudo code mixing scala and java, as I'm not super expert with Java syntax for futures, I mainly use it in Scala, so please adapt it a little:

if (message instanceof BusinessRequestVO) {
  new Future (
    BusinessRequestVO requestVo = (BusinessRequestVO)message
    try {
      ServiceResponse response = serviceAdapter.getWorkOrder(requestVo);
      getSender().tell(response, ActorRef.noSender());
    } 
    catch (Exception e) {
      getSender().tell(new akka.actor.Status.Failure(e), getSelf());
      throw e;
    }
  ) pipeTo sender
}

And in main (see here for java's future.cancel)

if (timeout) future.cancel(true)
Community
  • 1
  • 1
Diego Martinoia
  • 4,592
  • 1
  • 17
  • 36
  • Thanks for the reply,but I am already using the future in main actor which timesout,but in subactor there is no way to timeout the subactor as in subactor I am calling a normal java function of other class. 1.Can I use future for calling normal java class operations,or this only works for AKKA actors/subactors ask/tell? 2.we have future in java concurrency is this future similar to what AKKA is having? Please suggest me the snippet of code to timeout for the below code ServiceResponse response = serviceAdapter.getWorkOrder(requestVo); – Rajesh B Dec 08 '14 at 05:59
  • Futures are not from Akka, they are a Java/scala thing. You can wrap a lot of stuff in a future, not just asks. But with asks you can pipe the future easily to the sender. Akka future are the java/scala futures. – Diego Martinoia Dec 09 '14 at 08:21