7

I have a service (let's call it Service A) which uses Akka Server HTTP to handle incoming requests. Also I have 3rd party application (Service B) which provides several web services. The purpose of service A is to transform client requests, call one or multiple web services of service B, merge/transform results and serve it back to a client.

I am using Actors for some parts, and just Future for other. To make a call to Service B, I use Akka HTTP client.

Http.get(actorSystem).singleRequest(HttpRequest.create()
        .withUri("http://127.0.0.1:8082/test"), materializer)
        .onComplete(...)

The issue is, a new flow is created per each Service A request, and if there are multiple concurrent connections, it results in akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

I already asked this question and got a suggestion to use a single Flow How to properly call Akka HTTP client for multiple (10k - 100k) requests?

While it works for a batch of requests coming from a single place, I don't know how to use a single Flow from all my concurrent request handlers.

What is the correct "Akka-way" to do it?

Community
  • 1
  • 1
relgames
  • 1,356
  • 1
  • 16
  • 34

3 Answers3

13

I think you could use Source.queue to buffer your requests. The code below assume that you need to get the answer from 3rd party service, so having a Future[HttpResponse] is very welcomed. This way you could also provide an overflow strategy to prevent resource starvation.

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
  .via(pool)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))(Keep.left)
  .run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise

val response = queue.offer(request).flatMap(buffered => {
  if (buffered) promise.future
  else Future.failed(new RuntimeException())
})

Await.ready(response, 3 seconds)

(code copied from my blog post)

khiramatsu
  • 201
  • 2
  • 4
  • Nice, thank you! I actually contacted devs on gitter and also got a suggestion to use Source.queue and promises. But I did not have time yet to try it. With your code it will be easier! – relgames Feb 01 '16 at 15:05
  • @khiramatsu do you have an example in Java? – Tiago Sep 13 '16 at 03:22
  • This solution was added to the [original documentation](http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue). I just want to add that when you use `cachedHostConnectionPool` do not forget to configure your setting of `akka.http.host-connection-pool` such as `max-open-requests` and `max-connections` – Ilya Dzivinskyi Aug 08 '17 at 22:00
4

Here is Java version of the accepted answer

final Flow<
    Pair<HttpRequest, Promise<HttpResponse>>,
    Pair<Try<HttpResponse>, Promise<HttpResponse>>,
    NotUsed> flow =
    Http.get(actorSystem).superPool(materializer);

final SourceQueue<Pair<HttpRequest, Promise<HttpResponse>>> queue = Source.<Pair<HttpRequest, Promise<HttpResponse>>>
    queue(BUFFER_SIZE, OverflowStrategy.dropNew())
    .via(flow)
        .toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left())
        .run(materializer);

...

public CompletionStage<HttpResponse> request(HttpRequest request) {
    log.debug("Making request {}", request);

    Promise<HttpResponse> promise = Futures.promise();
    return queue.offer(Pair.create(request, promise))
        .thenCompose(buffered -> {
            if (buffered instanceof QueueOfferResult.Enqueued$) {
                return FutureConverters.toJava(promise.future())
                    .thenApply(resp -> {
                        if (log.isDebugEnabled()) {
                            log.debug("Got response {} {}", resp.status(), resp.getHeaders());
                        }
                        return resp;
                    });
            } else {
                log.error("Could not buffer request {}", request);
                return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));
            }
        });
}
relgames
  • 1,356
  • 1
  • 16
  • 34
0

All you need to do is setup a HostConnectionPool to Service B within your Service A code. This will give you a Flow that can be added to your Service A streams to dispatch requests from A to B using a connection pool instead of a new connection per stream. From the documentation:

As opposed to the Connection-Level Client-Side API the host-level API relieves you from manually managing individual HTTP connections. It autonomously manages a configurable pool of connections to one particular target endpoint (i.e. host/port combination).

Each materialization of this Flow, in different streams, will draw from this underlying pool of connections:

The best way to get a hold of a connection pool to a given target endpoint is the Http.get(system).cachedHostConnectionPool(...) method, which returns a Flow that can be "baked" into an application-level stream setup. This flow is also called a "pool client flow".

Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • No, it did not work, I get akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [4]. Here is my code https://gist.github.com/relgames/0c2005bae42922ab2da3 – relgames Jan 20 '16 at 22:46
  • @relgames I think you need to refactor your code a bit. There should not be a Source.single within a for-loop. Instead, the for loop should be part of the stream. As an example in scala : `Source(1 to 100).map(i => (HttpRequest.create("/test"), i).via(flow)`... This prevents the materialization of 100 Streams, and instead utilizes a single Stream. – Ramón J Romero y Vigil Jan 20 '16 at 23:02
  • 1
    Service A is handling HTTP requests using HTTP DSL. A handler is then calls Service B using HTTP Client API and forwards results back to a requestor. So in the real program there is no for-loop, but multiple requests coming from external clients. I was just trying to mimic it using a for-loop. My original question is exactly about that: how do I use single Flow (and single materializer?) from multiple threads/services spread in time. – relgames Jan 21 '16 at 10:56
  • 1
    @relgames did you get this problem solved? I'm with this same issue right now. Thanks in advance. – Tiago Sep 13 '16 at 13:19
  • @tiago yes, check this answer https://stackoverflow.com/a/35115314/1300246 – relgames Sep 14 '16 at 10:03
  • Thanks @relgames. I intend to post an answer using java to help others who aren't familiar with scala to achieve this. – Tiago Sep 14 '16 at 16:56