0

in Scala, I have an akka http client class with some local binding:

    class AkkaConPoolingHttpClient(
        override val timeout: Option[FiniteDuration] = None,
        val localBinding: Option[InetSocketAddress] = None,
        val userAgentHeader: Option[String] = None)(
        implicit val config: HttpClient.Config,
        val system: ActorSystem,
        val materializer: Materializer)
      extends AkkaHttpClient  {

  protected val http = Http()

  override def dispatch(request: HttpRequest): Future[HttpResponse] = {

    val effectivePort = request.uri.effectivePort

    val connection =
      http.outgoingConnection(
        request.uri.authority.host.address(),
        port = effectivePort,
        localAddress = localBinding)

    val preparedRequest = userAgentHeader match {
      case Some(userAgent) => fixUri(request.withHeaders(request.headers ++ Seq(headers.`User-Agent`(userAgent))))
      case None => fixUri(request)
     }

    Source.single(preparedRequest) via connection runWith Sink.head
  }

object AkkaConPoolingHttpClient {
  private def fixUri(request: HttpRequest): HttpRequest =
    request.withUri(request.uri.toRelative)
}

and I'm trying to see if it reuses the connections and it seems it doesn't:

      val connectionCount = new AtomicInteger()
      val testServerFuture = Http().bind("127.0.0.1", 0).to {
        Sink.foreach { incomingConnection =>
          connectionCount.incrementAndGet()
          incomingConnection.flow.join(Flow[HttpRequest].map(_ => HttpResponse())).run()
        }
      }.run()

      val testServerPort = Await.result(testServerFuture, defaultExpectTimeout)
        .localAddress.getPort
      val address = "127.0.0.1"
      val addr = Some(new InetSocketAddress(address, 0))
      val client = new AkkaConPoolingHttpClient(localBinding = addr)

      // Send some requests concurrently
      val requests = List(
        Get(s"http://127.0.0.1:$testServerPort/1"),
        Get(s"http://127.0.0.1:$testServerPort/2"),
        Get(s"http://127.0.0.1:$testServerPort/3"))

      val responses = Await.result(
        Future.sequence(requests.map(client.sendRequest)),
        defaultExpectTimeout)

      // Send some more requests -- the connections from before should be reused
      Thread.sleep(500)
      val responses2 = Await.result(
        Future.sequence(requests.map(client.sendRequest)),
        defaultExpectTimeout)

      // Usually this is "3", occasionally "4". 
      connectionCount.get() must beLessThanOrEqualTo(4)

Unfortunately, the test fails, connectionCount.get() has 6 connections. Why isn't it reuse the connections? what's wrong with this code?

I also tried with:

val effectivePort = request.uri.effectivePort
val clientSettings = ClientConnectionSettings(system).withSocketOptions(SO.ReuseAddress(true) :: Nil)
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
  Http().outgoingConnection(
        request.uri.authority.host.address(),
        port = effectivePort,
        localAddress = localBinding,
        settings = clientSettings
  )
..................
Source.single(preparedRequest)
      .via(connectionFlow)
      .runWith(Sink.head)

But I still have 6 connections in my test...

WDrgn
  • 521
  • 10
  • 29
  • To properly diagnose the problem I believe the definition of `AkkaHttpClient` and `sendRequest` is necessary. You only show `dispatch` but don't explain how `sendRequest` calls `dispatch`. – Ramón J Romero y Vigil Jan 14 '18 at 14:08

1 Answers1

1

Problem

The problem is rooted in the fact that you are creation a new connection for each request. The client code is actually quite clear:

Source.single(preparedRequest) via connection runWith Sink.head

Each request is being sent through a newly instantiated connection. This is due to a general design flaw where you are getting the address from the request:

val connection =
  http.outgoingConnection(
    request.uri.authority.host.address(), //address comes from request
    port = effectivePort,
    localAddress = localBinding)

It would be more efficient to establish the address once (ensuring a single Connection), and then each Request would just need the path.

Solution

To use a single connection you'll have to create a single Flow and send all of your requests through that, as described here.

Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • <> Can you please show me how can I do that with my code? – WDrgn Jan 14 '18 at 14:57
  • @WDrgn That would only be possible with a significant restructuring of `AkkaConPoolingHttpClient` and a re-design of `dispatch` since you are depending on the `HttpRequest` to get the host address (as explained in my answer). See the linked answer in my answer for an example... – Ramón J Romero y Vigil Jan 14 '18 at 15:17
  • I cannot use hardcoded values into that class... the outgoingConnection should come somehow from the request... – WDrgn Jan 14 '18 at 16:25
  • @WDrgn Then by definition you cannot use the same connection for all `HttpRequest` objects since it is possible they would point to different host & port. You either get to eat your cake or have it, not both... – Ramón J Romero y Vigil Jan 14 '18 at 16:41
  • Ramon, you were right about that code, that a new connection was made for every request but I didn't found yet the solution, is it any way to have localAddress into a Http().singleRequest? or how should I use http.outgoingConnection to make an connection pool? because I also tried with connectionFlow with an hardcoded host and I get the same problem in reusing the connections. – WDrgn Jan 25 '18 at 08:39
  • @WDrgn If you follow the example in the link I provided under "Solution", and you send all Requests through that Flow From a Single source instead of 1 Request per 1 Source, then you should only establish 1 connection to the server. To address your use of the Flow I would have to see your code... – Ramón J Romero y Vigil Jan 25 '18 at 11:14
  • ok, I've solve it by upgrading to akkaHttpV = "10.0.9", and I used http.singleRequest. In 10.0.3, singleRequest had an flow on localBinding. I found the solution here: https://github.com/akka/akka-http/issues/1284 – WDrgn Jan 25 '18 at 13:27