4

I'm an experienced Java programmer and I'm starting developing actor-based Scala applications. In an application that I'm currently developing I have to deal with the implementation of a Sender actor exhibiting both an autonomous and reactive behavior. The scenario is the following (pseudo-code):

Actor Sender{

        Active behavior (must be active once the actor boots):
            do-in-sequence{
                send to Stdout A
                send to Stdout B
                send to Stdout C
                send stop to Stdout and then exit
            }

        Reactive behavior (must be active once the actor boots):
            as soon as received stop from StopNotifier -> send stop to Stdout and then exit
    }
}

Actor Stdout{
    Purely reactive behavior (i.e. wait for msg){
        as soon as received A -> print A
        as soon as received B -> print B
        as soon as received C -> print C
        as soon as received stop from Sender -> exit
    }
}
Actor StopNotifier
    Purely active behavior {
        compute, and when some condition is met -> send stop to Sender
    }

My question is: which is the best way to express an autonomous behavior for a Scala actor that need to integrate autonomy and reactivity (as described in this paper)?

In other words, which is the best way/style to code the Sender actor in the above example?

I came up with a solution (reported below) but since I'm not a scala guru (yet :)) I would like to know if what I've implemented can be improved in a better/nicer solution.

case object START
case object A
case object B
case object C
case object SENT_A
case object SENT_B
case object ACK_A
case object ACK_B
case object ACK_C
case object STOP

class Sender(stdout: Stdout) extends Actor {
    def act() {
        self!START
        while (true){
            receive {
                case START =>
                    stdout!?A
                    self!SENT_A
                case SENT_A =>
                    stdout!?B
                    self!SENT_B
                case SENT_B =>
                    stdout!?C
                    stdout!?STOP
                    exit()
                case STOP => {
                    Console.println("[Sender:]Received STOP, terminating")
                    stdout!?STOP
                    exit()
                }
            }
        }
    }
}

class Stdout() extends Actor {
    def act() { 
        while (true) {
            receive{
                case A =>
                    Console.println("[Stdout:]A")
                    reply(ACK_A)
                case B =>
                    Console.println("[Stdout:]B")
                    reply(ACK_B)
              case C =>
                    Console.println("[Stdout:]C")
                    reply(ACK_C)
                    exit()
              case STOP =>
                    Console.println("[Stdout:]Received STOP, terminating")
                    exit()
            }
        }
    }
}

class StopNotifier(sender: Sender) extends Actor {
    def act() {
        /*
         * The idea is that the StopNotifier should send a STOP message to the Sender
         * when a certain condition is met.
         * The sleep used here is just a semplification, since the detection of such
         * a condition is not relevant for the example.
         */

        Thread.sleep(200)
        Console.println("[StopNotifier:]Sending STOP to sender")
        sender ! STOP
        exit()
   }
}

object app extends Application {
    val stdout = new Stdout
    stdout.start
    val sender = new Sender(stdout)
    sender.start
    val stopNotifier = new StopNotifier(sender)
    stopNotifier.start
}

In particular what bothers me in my current implementation is the fact that, for being able to promptly react to the reception of the STOP message from the StopNotifier, I had the need to self-send messages at each execution step of the Sender (i.e. after sending A, B to the Stdout actor). It seems to me too tricky to be the right way to do the things :).

I tried also to develop other solution using other scala language contructs (e.g. asynchronous send, react, etc.) but in one way or another they seemed to me affected to other problems/tricks.

Does anyone have a better solution for dealing with the integration of autonomy and reactive behaviors in scala actors?

2 Answers2

4

If I understand correctly, you should use Akka actors instead, Akka FSM specifically, to model the sender as a State Machine. Akka actors have a built in stop mechanism, or you could always use your own message which could be handled from all states through the whenUnhandled handler.

See http://doc.akka.io/docs/akka/snapshot/scala/fsm.html

This is obviously overkill for this, but I assume you're trying to do something more complicated. You could also have Stdout "watch" Sender so that it terminates when Sender terminates instead of when it receives a specific message. See Lifecycle Monitoring aka DeathWatch.

package fsmTest

import akka.actor._
import akka.util.duration._

sealed trait Msg
case object A extends Msg
case object B extends Msg
case object C extends Msg

sealed trait SenderState
case object Started extends SenderState
case object SentA extends SenderState
case object SentB extends SenderState

case class SenderData()

class Sender(stdout: ActorRef) 
  extends Actor 
  with FSM[SenderState, SenderData] {

  case object GoNextState

  startWith(Started, SenderData())

  when(Started) {
    case Event(GoNextState, data) => {
      stdout ! A
      goto(SentA) using data
    }
  }

  when(SentA) {
    case Event(GoNextState, data) => {
      stdout ! B
      goto(SentB) using data
    }
  }

  when(SentB) {
    case Event(GoNextState, data) => {
      stdout ! C
      goto(Started) using data
    }
  }

//      //Handle messages which aren't explicitly handled in state here
//      whenUnhandled {
//        case Event(SomeCustomStop, data) => {
//          stop(FSM.Shutdown)
//        }
//      }

  setTimer("goNextState", GoNextState, 1 second, repeat = true)

  initialize
}

class Stdout() extends Actor {
  def receive = {
    case msg: Msg => {
      context.watch(sender) //Not sure if you're gonna want to do this here, but you get the point
      println(msg)
    }
    case Terminated(_) => context.stop(self)
  }
}

object FSMTest extends App {

  implicit val system = ActorSystem("Testing")
  val stdout = system.actorOf(Props[Stdout], "stdout")
  val sender = system.actorOf(Props(new Sender(stdout)), "sender")

  system.scheduler.scheduleOnce(5 seconds) {
    system.stop(sender)
    system.shutdown()
  }
  system.awaitTermination(10 seconds)
}

Regardless of how you implement the state in the sender, if you want to use Actors to model it, I believe that you will need to "self send" messages, either in the event handling or with the timer as I have above.

drstevens
  • 2,903
  • 1
  • 21
  • 30
  • I see... I will for sure have a better look to akka FSM-based actors then (I did not know akka). However, following your reply seems to me that the scala-based solution I have adopted is the most appropriate for this kind of problem right? As you said, "self send" messages became mandatory in order to achieve that sort of integration between autonomous and reactive behaviors. – the_dark_destructor Jul 11 '12 at 12:41
  • @the_dark_destructor This question is probably too open ended. If you need the concurrency then Akka actors are probably the way to go. It may be overkill though. Note that I didn't realize you needed the calls to stdout to be synchronous. This leads me to believe that this is overkill. You should use `?` instead of `!` to get back a future. – drstevens Jul 12 '12 at 16:06
1

Messages sent in order from one actor inside one react or receive block will be received in that order. (You may have other messages from other actors interspersed, but you won't send A then B and get B then A.)

So you can just

stdout ! A
stdout ! B
stdout ! C

unless you need to do something else as well.

Rex Kerr
  • 166,841
  • 26
  • 322
  • 407
  • Yes,I know that. However in the real application I need to retrieve the replies in sequence (that is why I used !?). Moreover, as described in the question, I need to integrate this active behaviour with the reactive behaviour associated to the reception of the stop from the StopNotifier. – the_dark_destructor Jul 11 '12 at 07:57
  • @the_dark_destructor - !? is synchronous--you don't pass on from that point until you get the reply, so you can't respond that promptly to a stop anyway. If it's okay to wait for one, why is it not okay to wait for all three? – Rex Kerr Jul 11 '12 at 14:19