1

I am trying to send 200k messages to akka-http .

protected val someRouts: Route = pathPrefix("foo") {
        pathEndOrSingleSlash {
          put {
             entity(as[Foo]) { foo =>
              log.debug(s"/foo update $foo")
              complete(Future(Foo("a")).map(f => s"Got - $f "))
            }
          }
        }

Http().bindAndHandle(someRouts, "0.0.0.0", 9000)

from a different process I am sending in a loop 200K messages. client code (simplified):

lazy val apiFlow: Flow[HttpRequest, HttpResponse, Any] =
    Http().outgoingConnection("0.0.0.0", 9000)

  def request(request: HttpRequest): Future[HttpResponse] = Source.single(request).via(apiConnectionFlow).runWith(Sink.head)

for (i <- 1 to 200000){
    request(RequestBuilding.Put("/foo", Foo(i))
}

after a short while I get this exception:

[akka.actor.default-dispatcher-33] ERROR akka.io.TcpListener - Accept error: could not accept new connection
java.io.IOException: Too many open files in system
    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
    at akka.io.TcpListener.acceptAllPending(TcpListener.scala:112)
    at akka.io.TcpListener$$anonfun$bound$1.applyOrElse(TcpListener.scala:85)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.io.TcpListener.aroundReceive(TcpListener.scala:34)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

UPDATE checking the open connections using lsof -i tcp:9000 | wc -l I can see that the number is significantly climbing above 6500

igx
  • 4,101
  • 11
  • 43
  • 88
  • Could you add the client side code? Also, the `IOException` is complaining that there are "too many open files in system" but your sample code doesn't open any files. Is the sample truly representative of the code that threw the exception? – Ramón J Romero y Vigil Sep 05 '17 at 16:53
  • @RamonJRomeroyVigil I added client code. as for your remak this is exactly my problem . I am not opening any files – igx Sep 05 '17 at 19:47

1 Answers1

1

Your issue is that you are attempting to open 200,000 TCP connections to 0.0.0.0:9000 and the machine you are working with is not configured to allow that many file descriptors to be active at one time.

From the docs (emphasis mine):

Note that no connection is attempted until the returned flow is actually materialized! If the flow is materialized several times then several independent connections will be opened (one per materialization).

Check ulimit -n - it is probably well below 200K.

See also:

Sean Vieira
  • 155,703
  • 32
  • 311
  • 293
  • Thanks, but shouldn't the connection close ? as you see, the response is very quick and should close the connection. if I am trying to execute 10k , the open connections (running lsof -i tcp:9000 | wc -l command) is reaching 2-4 but running 100k reaches more than 6500 (!!!) . I will also check your suggestion – igx Sep 07 '17 at 04:21
  • 1
    Solved - changed the code to `def apiRequest(requests: Seq[HttpRequest]): Future[HttpResponse] = Source.fromIterator(() =>requests.toIterator) .via(apiConnectionFlow).runWith(Sink.head)` – igx Sep 07 '17 at 10:44