1

I have an Akka messaging engine that delivers millions of messages during the day, both SMS and Email. I need to introduce a new type of messaging (PushNotification) which consists in having each request consume a REST API (it will also process millions). I believe that consuming a Webservice is a blocking operation, so from what I have read I need to add a separate dispatcher for this new Actor, my questions is, does it necessarily need to be a thread-pool-executor with a fixed-pool-size as mentioned here? (See https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html) or is it possible to use a fork-join-executor instead? Also what is the best approach in order not to affect the current 2 types of Messaging ? (SMS and EMAIL) I mean how do I avoid to starve their thread-pool ? Currently EMAIL is using a separate Dispatcher and SMS is using the Default Dispatcher. Instead of creating a new Dispatcher for the Actor with blocking operation (calling WebService) is there any other way ? Like creating a reactive web service ?

Diego Ramos
  • 989
  • 4
  • 16
  • 35
  • 2
    What REST client are you using? Akka Http works asynchronously using the same thread pool and integrates well with Akka Actors. – Tim Jun 14 '19 at 06:04
  • @Tim, I am using Jersey (com.sun.jersey.api.client.ClientResponse), do you mean Akka Http is reactive ? – Diego Ramos Jun 18 '19 at 20:52

1 Answers1

2

Using a RESTful API from a web service does not have to be blocking.

An easy way to consume a RESTful API from an actor is to use Akka HTTP Client. This allows you to send an HTTP request and have the result sent back as a message to an actor using the pipeTo method.

This is a very cut-down example (lightly modified from the sample in the documentation).

import akka.http.scaladsl.Http

object RestWorker {
  def props(replyTo: ActorRef): Props =
    Props(new RestWorker(replyTo))
}

class RestWorker(replyTo: ActorRef) extends Actor
{
  implicit val ec: ExecutionContext = context.system.dispatcher

  override def preStart() = {
    Http(context.system).singleRequest(HttpRequest(uri = "https://1.2.3.4/resource"))
      .pipeTo(self)
  }

  def receive = {
    case resp: HttpResponse =>

      val response = ??? // Process response

      replyTo ! response

      self ! PoisonPill
  }
}
Tim
  • 26,753
  • 2
  • 16
  • 29
  • Underlying concept is called java NIO, or non-blocking IO. Check out e.g. https://stackoverflow.com/questions/25099640/non-blocking-io-vs-async-io-and-implementation-in-java/25100342#25100342 – ygor Jun 14 '19 at 09:34
  • 1
    The underlying concept is called "asynchronous IO", and it is up to Akka Http to choose how to implement it. Blocking/non-blocking is a different concept. – Tim Jun 14 '19 at 09:56