2

I'm playing with the akka-stream-and-http-experimental 1.0. So far, I've a user service that can accept and respond to HTTP requests. I'm also going to have an appointment service that can manage appointments. In order to make appointments, one must be an existing user. Appointment service will check with the user service if the user exists. Now this obviously can be done over HTTP but I'd rather have the appointment service send a message to the user service. Being new to this, I'm not clear how to use actors (as akka-http abstracts that) to send and receive messages. There's mention of ActorRef and ActorPublisher in the doc but no examples of the former and the later looks like an overkill for my need. My code looks like the following and is on Github:

trait UserReadResource extends ActorPlumbing {
  val userService: UserService
  val readRoute = {
  // route stuff
  }
}

trait ActorPlumbing {
  implicit val system: ActorSystem
  implicit def executor: ExecutionContextExecutor
  implicit val materializer: Materializer

  def config: Config
  val logger: LoggingAdapter
}

trait UserService { // Implemented by Slick and MongoDB in the backend
  def findByFirstName(firstName: String): Future[immutable.Seq[User]]
}

object UserApp extends App with UserReadResource with UserWriteResource with ActorPlumbing {
  override implicit val system = ActorSystem()
  override implicit def executor = system.dispatcher
  override implicit val materializer = ActorMaterializer()

  override def config = ConfigFactory.load()
  override val logger = Logging(system, getClass)

  private val collection = newCollection("users")
  val userRepository = new MongoDBUserRepository(collection)
  val userService: UserService = new MongoDBUserRepositoryAdapter(userRepository) with UserBusinessDelegate { 
    // implicitly finds the executor in scope. Ain't that cute?
    override implicit def executor = implicitly
  } 

  Http().bindAndHandle(readRoute ~ writeRoute, config.getString("http.interface"), config.getInt("http.port"))
}

Edit: I figured out how to send messages, which could be done using Source.actorRef. That only emits the messages into the stream. What I'd like to do is for the route handler class to receive the response. That way when I create the appointment service, it's actor can call the user service actor and receive the response in the same manner as the user route handler in my example does. Pseudo code:

val src = Source.single(name) \\ How to send this to an actor and get the response

Edit 2: Based on the @yardena answer, I came up with the following but the last line doesn't compile. My actor publisher returns a Future which I'm guessing will be wrapped in a Promise and then delivered as a Future to the route handler.

get {
  parameters("firstName".?, "lastName".?).as(FindByNameRequest) { name =>
    type FindResponse = Future[FindByNameResponse]

    val src: Source[FindResponse, Unit] = Source.actorPublisher[FindResponse](businessDelegateProps).mapMaterializedValue {
      _ ! name
    }
    val emptyResponse = Future.apply(FindByNameResponse(OK, Seq.empty))

    val sink = Sink.fold(emptyResponse)((_, response: FindResponse) => response)

    complete(src.runWith(sink)) // doesn't compile
  }
}
Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
  • Wrap your user service in an actor, and make service calls by sending messages to that actor. Then, your appointment service (wrapped in another actor) can also interact with the user service by sending the same messages to the user actor wrapping it. – Shadowlands Sep 12 '15 at 23:04
  • @Shadowlands That's what I intend to do. The question is how to I materialize and send messages to the actor from the route? Can you show code examples of using `ActorRef` from a route? – Abhijit Sarkar Sep 12 '15 at 23:10
  • Not sure which aspect you are struggling with - how familiar are you with the rest of Akka? You can call an actor simply enough - and get a response of type `Future` - with a line like `myActorRef ? myMessage` where the message can be of any immutable type (so anything from a `String` to a purpose-built case class, eg. `case class UserByFirstNameRequest(firstName: String)`. See [here](https://groups.google.com/forum/#!topic/akka-dev/ei-0OzzgKd0) for example. – Shadowlands Sep 13 '15 at 03:31
  • @Shadowlands You stated the obvious. What I am trying to understand is how the `UserReadResource` in my code would create an `Source[T, ActorRef]` and then materialize that into an `Actor`. Looks like I'll have to create a Publisher using `Source.actorPublisher(Props)` where the Props will be supplied by the `UserApp`. – Abhijit Sarkar Sep 13 '15 at 03:48

2 Answers2

2

I ended up with using Actor.ask. Simple.

Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
1

This link may be helpful: http://zuchos.com/blog/2015/05/23/how-to-write-a-subscriber-for-akka-streams/ and this answer by @Noah Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef

Basically you have 2 choices: 1) if you want a "simple" actor, which will forward into the stream all messages that it receives, you can use Source.actorRef. Then you can pipeline the messages into UserService by creating a processing stage using mapAsync. 2) Another option, in case you want the actor to have some custom behavior, is to write your own ActorPublisher.

HTH

Community
  • 1
  • 1
yardena
  • 21
  • 1
  • On a quick glance, your first link seems relevant and it is doing what I'm trying to do as well. I'll take a look and let you know. – Abhijit Sarkar Sep 14 '15 at 01:03