5

In my scenario I have 2 actors:

  1. watchee (I use TestProbe)
  2. watcher (Watcher wrapped into TestActorRef to expose some internal state I track in my test)

Watcher should take some actions when watchee dies.

Here is the complete test case I've written so far:

class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {

  def this() = this(ActorSystem("TempTest"))

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  class WatcherActor(watchee: ActorRef) extends Actor {

    var state = "initial"
    context.watch(watchee)

    override def receive: Receive = {
      case "start" =>
        state = "start"
      case _: Terminated =>
        state = "terminated"
    }

  }

  test("example") {
    val watchee = TestProbe()
    val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))

    assert(watcher.underlyingActor.state === "initial")

    watcher ! "start" // "start" will be sent and handled by watcher synchronously
    assert(watcher.underlyingActor.state === "start")

    system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
    Thread.sleep(100) // what is the best way to avoid blocking here?
    assert(watcher.underlyingActor.state === "terminated")
  }

}

Now, since all involved actors use CallingThreadDispatcher (all Akka's test helpers gets constructed using props with .withDispatcher(CallingThreadDispatcher.Id)) I can safely assume that when this statement returns:

watcher ! "start"

... the "start" message is already processed by WatchingActor and thus I can make assertions based in the watcher.underlyingActor.state

However, based on my observations, when I stop watchee using system.stop or by sending Kill to it the Terminated message produced as a side effect of watchee death gets executed asynchronously, in another thread.

Not-a-solution is to stop watchee, block thread for some time and verify Watcher state after that, but I'd like to know how to I do this the right way (i.e. how to be sure that after killing actor it's watcher received and processed Terminated message signaling it's death)?

Eugene Loy
  • 12,224
  • 8
  • 53
  • 79

2 Answers2

5

One way to fix this issue is to introduce another watcher in your test that also watches the watchee. This other watcher is a TestProbe which will allow us to perform an assertion on it that will get rid of the timing issues you are seeing. First, the modified test code:

 val watchee = TestProbe()
 val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
 val probeWatcher = TestProbe()
 probeWatcher watch watchee.ref

 assert(watcher.underlyingActor.state === "initial")

 watcher ! "start" // "start" will be sent and handled by watcher synchronously
 assert(watcher.underlyingActor.state === "start")

 system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
 probeWatcher.expectTerminated(watchee.ref)
 assert(watcher.underlyingActor.state === "terminated")

So you can see that I have introduced the additional watcher with the lines:

val probeWatcher = TestProbe()
probeWatcher watch watchee.ref

Then, later in the code, before the final assertion that is failing for you I use another assertion that lets me know that the Terminated message for the stopped actor has been properly distributed:

probeWatcher.expectTerminated(watchee.ref)

When the code moves past this line I can be sure that the watcher under test has also received its terminated message and the assertion to follow will pass.

EDIT

As noted by the OP, there is a level of non-determinism with this code. Another possible solution is to change the line in the test code that stops the actor to:

watcher.underlyingActor.context.stop(watchee.ref)

By using the context of the TestActorRef I believe the Terminated will be delivered all via the CallingThreadDispatcher and thus be completely synchronous. I tested this in a loop and it worked for me over 1000 iterations.

Now I thought that maybe because I was performing the stop using the same actor that was expecting the Terminated that maybe there was an optimization to deliver the Terminated to self for that scanario, so I also tested this with a completely different Actor as follows:

class FooActor extends Actor{
  def receive = {
    case _ =>
  }

Then in the test code:

val foo = TestActorRef(new FooActor)

And on the stopping:

foo.underlyingActor.context.stop(watchee.ref)

This also worked as expected.

cmbaxter
  • 35,283
  • 4
  • 86
  • 95
  • I am afraid this is not a solution. Rebuilding test in a way you suggest introduces race condiotion (what exactly guarantees that `Terminated` sent to `probeWatcher` will be processed after `Terminated` sent to `watcher`?). See it by running the code you've provided into loop. – Eugene Loy Oct 20 '14 at 18:22
3

EDIT: After discussion and testing with OP, we have found out that sending PoisonPill as a mean of terminating the watched actor achieves the desired behavior, as the Terminated from PPill are processed synchronously while those from stop or kill are processed asynchronously.

While we are not sure about the reason, our best bet is this is due to the fact that killing an actor raises an exception while PPilling it does not.

Apparently, this has nothing to do with using the gracefulStop pattern as per my original suggestion, which I'll leave reported below.

In summary, the solution to OP's problem was just to terminate the watched actor sending a PPill instead that a Kill message or by doing system.stop.


Old answer begins here:

I may be going to suggest something a bit unrelated, but I feel it may apply.

If I understood correctly what you want to do is basically terminate an actor synchronously, i.e. do something that returns only once the actor is officially death and its death has been recorded (in your case by the watcher).

In general, death notification, as well as most else in akka, is async. Nonetheless, it's possible to obtain synchronous death confirmation using the gracefulStop pattern (akka.pattern.gracefulStop).

To do so, the code should be something similar to:

val timeout = 5.seconds
val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill)
Await.result(killResultFuture, timeout)

What this does is sending a PoisonPill to the victim (note: you can use a custom message), who will reply with a future that is completed once the victim dies. Using Await.result you are guaranteed to be synchronous.

Unfortunately, this is only usable if a) you are actively killing the victim and you don't instead want to respond to an external cause of death b) You can use timeouts and blocking in your code. But maybe you can adapt this pattern to your situation.

Diego Martinoia
  • 4,592
  • 1
  • 17
  • 36
  • Tried your snippet and it worked. The funny thing is that it suffers pretty much the same problem as initial @cmbaxter's answer (awaiting on result of `gracefulStop` guarantees to return after target actor's 'posStop', not after all `Terminated`, produced by target actor death are consumed). I bent by head trying to understand why your snipped work and it turned out that it is not `gracefulStop` that makes it work but the fact that you send `PoisonPill` to terminate actor. – Eugene Loy Oct 24 '14 at 07:35
  • For some reason `Terminated`'s, produced by actor death from `PoisonPill` gets processed synchronously, while the ones produced by death from `Kill` or `system.stop` - asynchronously. I have not yet figured out why, yet, but in the end it means that one can simply send `PoisonPill` to the `watchee` to achieve my goal (knowing when actor died and all `watcher`s has been acknowledged) and that `gracefulStop` has nothing to do with solving this issue (in fact, using `gracefulStop` and `Kill` as a message introduces race condition I talked in comment above). – Eugene Loy Oct 24 '14 at 07:44
  • I'm unsure about it too. Could it be that's because Kill throws an exception while PPill doesn't? – Diego Martinoia Oct 24 '14 at 19:40
  • It is most likely that this is the reason, though I have not verified this yet. – Eugene Loy Oct 27 '14 at 12:35
  • I am going to award bounty to this post, since it (while not directly address my issue and leaves some questions) pointed me to the solution I am going to use. Also, upvoting rest of the current answers for the same reason. Diego, please, edit this answer to reflect the fact that sending `PoisonPill` works and I will also accept it. – Eugene Loy Oct 27 '14 at 12:57
  • Done. I also left the previous answer there so that people can make sense of the comments we posted. If you feel it's better to completely remove the previous version just let me know (I'm kind of new here on SO) – Diego Martinoia Oct 27 '14 at 16:02