I'm attempting to send a message when an actor is killed.
This is based on Akka deathwatch documentation : http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java
In serviceActor I'm awaiting a "kill" message but I'm never actually sending this message. So to receive the message in ServiceActor I use :
else if (msg instanceof Terminated) {
final Terminated t = (Terminated) msg;
if (t.getActor() == child) {
lastSender.tell(Msg.TERMINATED, getSelf());
}
} else {
unhandled(msg);
}
I've set the duration to 10 milliseconds :
Duration.create(10, TimeUnit.MILLISECONDS)
But the message Msg.TERMINATED
is never received in onReceive method :
@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
How can I send a message to HelloWorld when ServiceActor fails ?
Entire code :
package terminatetest;
import akka.Main;
public class Launcher {
public static void main(String args[]) {
String[] akkaArgsArray = new String[1];
akkaArgsArray[0] = "terminatetest.HelloWorld";
Main.main(akkaArgsArray);
}
}
package terminatetest;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class HelloWorld extends UntypedActor {
@Override
public void preStart() {
int counter = 0;
akka.actor.ActorSystem system = getContext().system();
final ActorRef greeter = getContext().actorOf(
Props.create(ServiceActor.class), String.valueOf(counter));
system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell(PoisonPill.getInstance(), getSelf());
}
}, system.dispatcher());
greeter.tell("http://www.google.com", getSelf());
counter = counter + 1;
}
@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
}
package terminatetest;
import static com.utils.PrintUtils.println;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
public class ServiceActor extends UntypedActor {
final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
{
this.getContext().watch(child);
}
ActorRef lastSender = getContext().system().deadLetters();
public static enum Msg {
SUCCESS, FAIL, TERMINATED;
}
@Override
public void onReceive(Object msg) {
if (msg instanceof String) {
String urlName = (String) msg;
try {
long startTime = System.currentTimeMillis();
URL url = new URL(urlName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder out = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
out.append(line);
}
System.out.println("Connection successful to " + url);
System.out.println("Content is " + out);
long endTime = System.currentTimeMillis();
System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");
} catch (MalformedURLException mue) {
println("URL Name " + urlName);
System.out.println("MalformedURLException");
System.out.println(mue.getMessage());
mue.printStackTrace();
getSender().tell(Msg.FAIL, getSelf());
} catch (IOException ioe) {
println("URL Name " + urlName);
System.out.println("IOException");
System.out.println(ioe.getMessage());
ioe.printStackTrace();
System.out.println("Now exiting");
getSender().tell(Msg.FAIL, getSelf());
}
}
else if (msg instanceof Terminated) {
final Terminated t = (Terminated) msg;
if (t.getActor() == child) {
lastSender.tell(Msg.TERMINATED, getSelf());
}
} else {
unhandled(msg);
}
}
}
Update : I'm now initiating the poisonPill from the child actor itself using :
Update to ServiceActor :
if (urlName.equalsIgnoreCase("poisonPill")) {
this.getSelf().tell(PoisonPill.getInstance(), getSelf());
getSender().tell(Msg.TERMINATED, getSelf());
}
Update to HelloWorld :
system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell("poisonPill", getSelf());
}
}, system.dispatcher());
This displays following output :
startTime : 1412777375414
Connection successful to http://www.google.com
Content is ....... (I'veremoved the content for brevity)
Total Time : 1268 milliseconds
Terminated
The poisonPill message is sent after 10 milliseconds and for this example the actor lives for 1268 milliseconds. So why is the actor not terminating when the poisonPill is sent ? Is this because the timings are so short ?
Updated code :
package terminatetest;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class HelloWorld extends UntypedActor {
@Override
public void preStart() {
int counter = 0;
akka.actor.ActorSystem system = getContext().system();
final ActorRef greeter = getContext().actorOf(
Props.create(ServiceActor.class), String.valueOf(counter));
system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell("poisonPill", getSelf());
}
}, system.dispatcher());
greeter.tell("http://www.google.com", getSelf());
counter = counter + 1;
}
@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
}
package terminatetest;
import static com.utils.PrintUtils.println;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;
public class ServiceActor extends UntypedActor {
ActorRef lastSender = getSender();
public static enum Msg {
SUCCESS, FAIL, TERMINATED;
}
@Override
public void onReceive(Object msg) {
if (msg instanceof String) {
String urlName = (String) msg;
if (urlName.equalsIgnoreCase("poisonPill")) {
this.getSelf().tell(PoisonPill.getInstance(), getSelf());
getSender().tell(Msg.TERMINATED, getSelf());
}
else {
try {
long startTime = System.currentTimeMillis();
System.out.println("startTime : "+startTime);
URL url = new URL(urlName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder out = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
out.append(line);
}
System.out.println("Connection successful to " + url);
System.out.println("Content is " + out);
long endTime = System.currentTimeMillis();
System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");
} catch (MalformedURLException mue) {
println("URL Name " + urlName);
System.out.println("MalformedURLException");
System.out.println(mue.getMessage());
mue.printStackTrace();
getSender().tell(Msg.FAIL, getSelf());
} catch (IOException ioe) {
println("URL Name " + urlName);
System.out.println("IOException");
System.out.println(ioe.getMessage());
ioe.printStackTrace();
System.out.println("Now exiting");
getSender().tell(Msg.FAIL, getSelf());
}
}
}
}
}