I understand that using for comprehension on Futures we can make them run in parallel if the they are created outside the for
block.
For example:
val f1 = Cloud.generateNumber(1)
val f2 = Cloud.generateNumber(2)
val f3 = Cloud.generateNumber(3)
val future = for {
a <- f1
b <- f2
c <- Future.successful(5 + 2)
d <- f3
} yield {
addInts(a,b,c,d)
}
The above code snippet would run f1, f2, and f3 in parallel.
Now consider the following sippets.
driver:
object DriverApp {
def main(args: Array[String]): Unit = {
val system = ActorSystem("MyActorSystem")
val parent = system.actorOf(Parent.props)
implicit val timeout: Timeout = 30.seconds
(parent ? GetMessageRequest()).mapTo[GetMessageResponse].map { resp =>
println("received response in driver")
println(resp.message)
}
Thread.sleep(5000)
system.shutdown()
}
}
parent:
class Parent extends Actor with ActorLogging {
val system = ActorSystem("MyActorSystem")
val child1Actor = system.actorOf(Child1.props, "child1-actor")
val child2Actor = system.actorOf(Child2.props, "child2-actor")
implicit val timeout: Timeout = 5.seconds
override def receive: Receive = {
case getFinalMessageRequest: GetMessageRequest => getFinalMessage(sender, getFinalMessageRequest)
}
private def getFinalMessage(sender: ActorRef, getFinalMessageRequest: GetMessageRequest): Unit = {
val r1 = getMessageFromC1()
val r2 = getMessageFromC2()
for {
i1 <- r1
i2 <- r2
} yield {
println("received response")
println(i1)
println(i2)
sender ! GetMessageResponse((i1 + i2).toString)
}
}
private def getMessageFromC1(): Future[Int] = {
import scala.concurrent.ExecutionContext.Implicits.global
Thread.sleep(Random.nextInt(500))
println("Executing getMessageFromC1")
(child1Actor ? GetMessageRequest()).mapTo[Int].map(v => v)
}
private def getMessageFromC2(): Future[Int] = {
import scala.concurrent.ExecutionContext.Implicits.global
Thread.sleep(Random.nextInt(500))
println("Executing getMessageFromC2")
(child2Actor ? GetMessageRequest()).mapTo[Int].map(v => v)
}
}
child1:
object Child1 {
final val props = Props[Child1]
case class GetMessageRequest2()
}
class Child1 extends Actor with ActorLogging{
override def receive: Receive = {
case getMessageRequest: GetMessageRequest => getFinalMessage(sender, getMessageRequest)
}
private def getFinalMessage(sender: ActorRef, getFinalMessageRequest: GetMessageRequest): Unit = {
println("in child actor 1")
sender ! 5
}
}
child2:
object Child2 {
final val props = Props[Child2]
}
class Child2 extends Actor with ActorLogging {
override def receive: Receive = {
case getFinalMessageRequest: GetMessageRequest => getFinalMessage(sender, getFinalMessageRequest)
}
private def getFinalMessage(sender: ActorRef, getFinalMessageRequest: GetMessageRequest): Unit = {
println("in child actor 2")
sender ! 10
}
}
For the second example the futures are executed sequentially. I've tested the the app quite a few times, on every occasion it results in the following output
Executing getMessageFromC1
in child actor 1
Executing getMessageFromC2
in child actor 2
received response
5
10
received response in driver
15
Is there some restriction that keeps us from running futures in parallel within actors?