0

I understand that using for comprehension on Futures we can make them run in parallel if the they are created outside the for block.

For example:

val f1 = Cloud.generateNumber(1)
val f2 = Cloud.generateNumber(2)
val f3 = Cloud.generateNumber(3)

val future = for {
  a <- f1
  b <- f2
  c <- Future.successful(5 + 2)
  d <- f3
} yield {
  addInts(a,b,c,d)
}

The above code snippet would run f1, f2, and f3 in parallel.

Now consider the following sippets.

driver:

object DriverApp {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("MyActorSystem")
    val parent = system.actorOf(Parent.props)

    implicit val timeout: Timeout = 30.seconds

    (parent ? GetMessageRequest()).mapTo[GetMessageResponse].map { resp =>
      println("received response in driver")
      println(resp.message)
    }

    Thread.sleep(5000)

    system.shutdown()
  }
}

parent:

class Parent extends Actor with ActorLogging {

  val system = ActorSystem("MyActorSystem")
  val child1Actor = system.actorOf(Child1.props, "child1-actor")
  val child2Actor = system.actorOf(Child2.props, "child2-actor")
  implicit val timeout: Timeout = 5.seconds

  override def receive: Receive = {
    case getFinalMessageRequest: GetMessageRequest => getFinalMessage(sender, getFinalMessageRequest)
  }

  private def getFinalMessage(sender: ActorRef, getFinalMessageRequest: GetMessageRequest): Unit = {
    val r1 = getMessageFromC1()
    val r2 = getMessageFromC2()

    for {
      i1 <- r1
      i2 <- r2
    } yield {
      println("received response")
      println(i1)
      println(i2)
      sender ! GetMessageResponse((i1 + i2).toString)
    }
  }

  private def getMessageFromC1(): Future[Int] = {
    import scala.concurrent.ExecutionContext.Implicits.global
    Thread.sleep(Random.nextInt(500))
    println("Executing getMessageFromC1")
    (child1Actor ? GetMessageRequest()).mapTo[Int].map(v => v)
  }

  private def getMessageFromC2(): Future[Int] = {
    import scala.concurrent.ExecutionContext.Implicits.global
    Thread.sleep(Random.nextInt(500))
    println("Executing getMessageFromC2")
    (child2Actor ? GetMessageRequest()).mapTo[Int].map(v => v)
  }
}

child1:

object Child1 {
  final val props = Props[Child1]

  case class GetMessageRequest2()
}

class Child1 extends Actor with ActorLogging{


  override def receive: Receive = {
    case getMessageRequest: GetMessageRequest => getFinalMessage(sender, getMessageRequest)
  }

  private def getFinalMessage(sender: ActorRef, getFinalMessageRequest: GetMessageRequest): Unit = {
    println("in child actor 1")
    sender ! 5
  }

}

child2:

object Child2 {
  final val props = Props[Child2]
}

class Child2 extends Actor with ActorLogging {

  override def receive: Receive = {
    case getFinalMessageRequest: GetMessageRequest => getFinalMessage(sender, getFinalMessageRequest)
  }

  private def getFinalMessage(sender: ActorRef, getFinalMessageRequest: GetMessageRequest): Unit = {
    println("in child actor 2")
    sender ! 10
  }
}

For the second example the futures are executed sequentially. I've tested the the app quite a few times, on every occasion it results in the following output

Executing getMessageFromC1
in child actor 1
Executing getMessageFromC2
in child actor 2
received response
5
10
received response in driver
15

Is there some restriction that keeps us from running futures in parallel within actors?

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
Utsav
  • 536
  • 2
  • 6
  • 18
  • Check out http://stackoverflow.com/questions/35849001/akka-future-parallel-versus-concurrent/35849295. – Kevin Meredith Nov 16 '16 at 19:08
  • 1
    Note that your first code snippet does not ensure that Futures are run in parallel, but rather that the ExecutionContext may choose to run them in parallel, if it has available threads. Futures are about concurrency (running computations independently of one another) rather than parallelism (running computations at the same time). – Cyrille Corpet Nov 16 '16 at 20:52
  • Does the actor and the futures it executes run in the same thread ? – Utsav Nov 17 '16 at 02:17
  • @user520209 It depends on if the actor system and the future use the same execution context. It is possible to run a future in a separate execution context (which can be thought of as another thread pool) in scala: `var someEventualResult = Future { doSomeTask() } (myExecutionContext)` – Jeff Steinmetz Nov 21 '16 at 02:48
  • Try replace Random.nextInt with a fixed numbers at first. Running 'quite few times' random concurrent code isn't a good way to check something. – kardapoltsev Nov 26 '16 at 06:23

0 Answers0