in Scala, I have an akka http client class with some local binding:
class AkkaConPoolingHttpClient(
override val timeout: Option[FiniteDuration] = None,
val localBinding: Option[InetSocketAddress] = None,
val userAgentHeader: Option[String] = None)(
implicit val config: HttpClient.Config,
val system: ActorSystem,
val materializer: Materializer)
extends AkkaHttpClient {
protected val http = Http()
override def dispatch(request: HttpRequest): Future[HttpResponse] = {
val effectivePort = request.uri.effectivePort
val connection =
http.outgoingConnection(
request.uri.authority.host.address(),
port = effectivePort,
localAddress = localBinding)
val preparedRequest = userAgentHeader match {
case Some(userAgent) => fixUri(request.withHeaders(request.headers ++ Seq(headers.`User-Agent`(userAgent))))
case None => fixUri(request)
}
Source.single(preparedRequest) via connection runWith Sink.head
}
object AkkaConPoolingHttpClient {
private def fixUri(request: HttpRequest): HttpRequest =
request.withUri(request.uri.toRelative)
}
and I'm trying to see if it reuses the connections and it seems it doesn't:
val connectionCount = new AtomicInteger()
val testServerFuture = Http().bind("127.0.0.1", 0).to {
Sink.foreach { incomingConnection =>
connectionCount.incrementAndGet()
incomingConnection.flow.join(Flow[HttpRequest].map(_ => HttpResponse())).run()
}
}.run()
val testServerPort = Await.result(testServerFuture, defaultExpectTimeout)
.localAddress.getPort
val address = "127.0.0.1"
val addr = Some(new InetSocketAddress(address, 0))
val client = new AkkaConPoolingHttpClient(localBinding = addr)
// Send some requests concurrently
val requests = List(
Get(s"http://127.0.0.1:$testServerPort/1"),
Get(s"http://127.0.0.1:$testServerPort/2"),
Get(s"http://127.0.0.1:$testServerPort/3"))
val responses = Await.result(
Future.sequence(requests.map(client.sendRequest)),
defaultExpectTimeout)
// Send some more requests -- the connections from before should be reused
Thread.sleep(500)
val responses2 = Await.result(
Future.sequence(requests.map(client.sendRequest)),
defaultExpectTimeout)
// Usually this is "3", occasionally "4".
connectionCount.get() must beLessThanOrEqualTo(4)
Unfortunately, the test fails, connectionCount.get() has 6 connections. Why isn't it reuse the connections? what's wrong with this code?
I also tried with:
val effectivePort = request.uri.effectivePort
val clientSettings = ClientConnectionSettings(system).withSocketOptions(SO.ReuseAddress(true) :: Nil)
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection(
request.uri.authority.host.address(),
port = effectivePort,
localAddress = localBinding,
settings = clientSettings
)
..................
Source.single(preparedRequest)
.via(connectionFlow)
.runWith(Sink.head)
But I still have 6 connections in my test...