2

I have a remote actor, Bar and a local actor, Foo. I want to use Foo to pass messages to Bar on each invocation of a CLI.

Bar can be passed messages successfully, but Foo hangs while waiting for a message. To fix this, I added a sys.exit(0) at the end of Foo's main. This causes an association issue with Foo's system.

How can I shut down my local actor between successive CLI issuances without killing my local actor manually?

Shut up and give me the code!


Foo:

build.sbt

name := "Foo"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"

fork in run := true

Main.scala

import akka.actor._
import com.typesafe.config.ConfigFactory

case class Config(mode: String = "", greeting: String="")

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")

  def receive = {
    case method: String => BarActor ! method
  }
}

object CommandLineInterface {

  val config = ConfigFactory.load()
  val system = ActorSystem("FooSystem", config.getConfig("FooApp"))

  val FooActor = system.actorOf(Props[Foo], name = "FooActor")

  val parser = new scopt.OptionParser[Config]("Foo") {
    head("foo", "1.x")

    help("help").text("prints usage text")

    opt[String]('m', "method").action( (x, c) =>
      c.copy(greeting = x) ).text("Bar will greet with <method>")
  }
}

object Main extends App {
  import CommandLineInterface.{parser, FooActor}

  parser.parse(args, Config()) match {
    case Some(config) => FooActor ! config.greeting
    case None => sys.error("Bad news...")
  }
  /* 
    When sys.exit(0) commented, this hangs and Bar greet.
    When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet.
   */
 
  //sys.exit(0)
}

application.conf

FooApp {
  akka {
    loglevel = "INFO"
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 0
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

Bar:

build.sbt

name := "Bar"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"

Main.scala

import akka.actor._
import com.typesafe.config.ConfigFactory

class Bar extends Actor {
  def receive = {
    case greeting: String => Bar.greet(greeting)
  }
}

object Bar {
  val config = ConfigFactory.load()
  val system = ActorSystem("BarSystem", config.getConfig("BarApp"))
  val BarActor = system.actorOf(Props[Bar], name = "BarActor")

  def greet(greeting: String) = println(greeting)

  def main(args: Array[String]): Unit = {
    /* Intentionally empty */
  }
}

application.conf

BarApp {
  akka {
    loglevel = "INFO"
    actor {
      provider = remote
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
      log-sent-messages = on
      log-received-messages = on
    }
  }
}

Run Foo with sbt 'run-main Main -m hello', and run Bar with sbt 'run-main Main'.

Sorry for the long code, but it's the MVCE for my problem.

How can I achieve my desired behavior -- the CLI actor dies between successive CLI invocations with the remote actor waiting for new messages.

Community
  • 1
  • 1
erip
  • 16,374
  • 11
  • 66
  • 121

1 Answers1

3

This is happening because you call sys.exit(0) immediately after sending a message to FooActor, so there's a significant chance that the application exits before FooActor gets the chance to even read the message, let alone forward it to BarActor.

There seem to be many possible solutions, one of them being:

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")

  override def receive = {
    case method: String => {
      BarActor ! method
      self ! PoisonPill
    }
  }

  override def postStop = {
    context.system.terminate
  }
}

Unfortunately, it turns out that the system still gets shut down before dispatching the message to Bar.

I couldn't find any reasonable solution to this issue if you want to send a message in a "fire and forget" style. However, in most cases, it's desirable to get some kind of response from the remote actor, so you could do:

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")

  override def receive = {
    case method: String => {
      BarActor ! method
      context.become(waitingToKillMyself)
    }
  }

  def waitingToKillMyself: Receive = {
    case response: String => {
      println(response)
      self ! PoisonPill
    }
  }

  override def postStop = {
    context.system.terminate
  }
}

// ...

object Main extends App {
  import CommandLineInterface.{parser, FooActor, system}
  import system.dispatcher

  parser.parse(args, Config()) match {
    case Some(config) => {
      FooActor ! config.greeting
      system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill)
    }

    case None => sys.error("Bad news...")
  }
}

Bar:

class Bar extends Actor {
  def receive = {
    case greeting: String => {
      Bar.greet(greeting)
      sender() ! "OK"
    }
  }
}
  • This seems like a solid solution if it's a one-time message, but I want `Bar` to basically always be waiting for a message. Will this achieve that end? – erip Nov 23 '16 at 19:08
  • 1
    Isn't `BarSystem` a separate actor system? – Paweł Bartkiewicz Nov 23 '16 at 19:13
  • Sorry, didn't see your comment after edit. Yes, I believe this should work fine (can't try this myself right now, I can do it later if you want). Since `FooSystem` and `BarSystem` are separate actor systems, `shutdown` or `terminate` should stop `FooSystem` only ([documentation](http://doc.akka.io/api/akka/2.4/index.html#akka.actor.ActorSystem@terminate():scala.concurrent.Future[akka.actor.Terminated])). – Paweł Bartkiewicz Nov 23 '16 at 19:24
  • I've added your answer and I'm finding that `Bar` isn't receiving my messages, but `Foo` is receiving the `"Done"` message anyway. I'm also getting dead-letters messages along with `Remote daemon shut down; proceeding with flushing remote transports.`. I'll dig a bit into that. – erip Nov 23 '16 at 19:26
  • @erip Edited my answer, this solution seems cleaner, hopefully it will also work better. I'll try this myself when I get a chance. – Paweł Bartkiewicz Nov 23 '16 at 19:50
  • @erip You're right, for some reason Akka won't dispatch the message before shutting down the system. That's probably because there are separate queues for local and remote actors, so local PoisonPill gets sent first. It would be nice if Akka had some method to flush all pending messages before terminating. – Paweł Bartkiewicz Nov 23 '16 at 22:05
  • How do you know you have flushed all messages? – Viktor Klang Nov 24 '16 at 14:34
  • @ViktorKlang As an Akka user? There's no way as far as I know, but considering you're working on Akka I guess you know better. So you're asking in the context of akka-remote? You could check the TCP buffer. There's still no guarantee that the message was correctly parsed at the other end, but at least you did everything you could to ensure it was sent before shutting down the local system. Which still doesn't make sense in most cases. I'm not an expert on Akka internals, just read most of the docs, some source and using Akka from time to time. – Paweł Bartkiewicz Nov 24 '16 at 15:00
  • My question was a leading one, since it all boils down to the 2 Generals problem. (Well, that and the fact that a message could hit the internal buffers at any point, in which case you need to have a timeout, but in any case you won't be sure) – Viktor Klang Nov 24 '16 at 16:16
  • @ViktorKlang Yes, I'm not saying this can be solved perfectly, but the Two Generals Problem affects everything, yet we do have such things as video streaming today and it works most of the time. If you assume that remote communication is always unreliable, then why create akka-remote in the first place? I'm only saying there's currently no way to send a fire-and-forget message and terminate the system afterwards with **any** hope of success. Which seems like a very rare scenario, so I fully understand that it may be not worth the effort. – Paweł Bartkiewicz Nov 24 '16 at 16:42
  • Anyway, sorry if it sounded like I was criticizing Akka, it's a really great project and I understand the design decisions behind it. I'll happily continue the discussion somewhere else (SO chat, Github ticket?) if you'd like, maybe even get to know Akka's source code better and try to come up with a solution in my spare time, if you think it's worth our time. For me, it's a "nice to have" feature request, nothing critical. – Paweł Bartkiewicz Nov 24 '16 at 17:05
  • In this case, just adding a bit of a delay before exiting is enough, it would be just as safe as any other solution I can think about :) – Viktor Klang Nov 24 '16 at 18:06
  • @ViktorKlang Yes, but then it would always cause the delay, right? Even if the message got processed very fast. I think waiting for the response (with a reasonable timeout) is much more efficient in most cases. – Paweł Bartkiewicz Nov 24 '16 at 18:10
  • Absolutely, but both the request and the response can be lost. :) Two Generals and all. :) But yeah… – Viktor Klang Nov 24 '16 at 18:53