I have an application with Master-Worker architecture. The Worker needs to do a heavy and long time job. And the Master needs to kill the job when we need.
I've tried not to use Future
and the worker cannot receive any message while working. So I tried to use Future
instead. However, when the Worker was stopped, job was still running. How can I release the resources after stopping the actor?
Here is the code.
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}
import scala.concurrent.Future
import scala.concurrent.duration._
object Main extends App {
object StopTask
case class DoTask(task: String)
override def main(args: Array[String]): Unit = {
val system = ActorSystem("ClusterSystem")
val master = system.actorOf(Props[Master], "master")
master ! "FooTask"
import system.dispatcher
system.scheduler.scheduleOnce(5 second) {
master ! StopTask
}
}
class Master extends Actor {
val worker: ActorRef = context.actorOf(Props[Worker], "worker")
def receive: Receive = {
case task: String => worker ! DoTask(task)
case StopTask => context stop worker
}
}
class Worker extends Actor {
import context.dispatcher
override def postStop(): Unit = {
println("Stopping task...")
}
def receive: Receive = {
case DoTask(task) =>
Future {
// High loading job here
while (true) {
println(s"Doing $task...")
Thread.sleep(1000)
}
}
}
}
}
The output is...
[INFO ] 2018-04-08 21:48:33,947 akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[INFO ] 2018-04-08 21:48:34,244 akka.remote.Remoting - Starting remoting
[INFO ] 2018-04-08 21:48:34,463 akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,466 akka.remote.Remoting - Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,521 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Starting up...
[INFO ] 2018-04-08 21:48:34,717 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Registered cluster JMX MBean [akka:type=Cluster,port=49770]
[INFO ] 2018-04-08 21:48:34,718 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Started up successfully
Doing FooTask...
[INFO ] 2018-04-08 21:48:34,777 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Metrics collection has started successfully
[INFO ] 2018-04-08 21:48:35,017 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2560]
Doing FooTask...
Doing FooTask...
Doing FooTask...
Doing FooTask...
Stopping task...
Doing FooTask...
Doing FooTask...
I have found the way to kill Future
. But I don't know how to integrate into this architecture. Hope anyone to help me.