6

Using spray 1.3.2 with akka 2.3.6. (akka is used only for spray).
I need to read huge files and for each line make a http request.
I read the files line by line with iterator, and for each item make the request. It run successfully for some of the lines but at some time it start to fail with:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms].
I first thought I overloading the service, so I set the "spray.can.host-connector.max-connections" to 1. It run much slower but I got the same errors.

Here the code:

import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
  mainType = "application",
  subType = "edn",
  compressible = true,
  binary = false,
  fileExtensions = Seq("edn")))

val pipeline = (
  addHeader("Accept", "application/json")
  ~> sendReceive
  ~> unmarshal[PipelineResponse])

def postData(data: String) = {
  val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType, data))
  val responseFuture: Future[PipelineResponse] = pipeline(request)
  responseFuture
}

dataLines.map { d =>
  val f = postData(d)
  f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
  f.map { p => someMoreLogic(d, p) }
}

aggrigateResults(dataLines)

I do it in such way since I don't need the entire data, just some aggregations.

How can I solve this and keep it entirely async?

roterl
  • 1,883
  • 14
  • 24
  • 4
    That's the timeout of `sendReceive`. See https://github.com/spray/spray/blob/master/spray-client/src/main/scala/spray/client/pipelining.scala You can adapt it by providing another implicit `Timeout` in scope. E.g. `implicit val timeout = Timeout(120.seconds)` – jrudolph Nov 16 '14 at 16:45
  • @jrudolph, does each call to the pipeline start the timeout timer at the call? isn't that mean it create an actor for the request at that time? Is there any way of calling it that would create the actor only at the request time? – roterl Nov 16 '14 at 17:30
  • Ask timeout is implemented via firstCompletedOf, the timer starts at call site. See my answer below. – ponythewhite Nov 16 '14 at 19:54

1 Answers1

7

Akka ask timeout is implemented via firstCompletedOf, so the timer starts when the ask is initialized.

What you seem to be doing, is spawning a Future for each line (during the map) - so all your calls execute nearly at the same time. The timeouts start counting when the futures are initialized, but there are no executor threads left for all the spawned actors to do their work. Hence the asks time out.

Instead of processing "all at once", I would suggest a more flexible approach - somewhat similar to using iteratees, or akka-streams: Work Pulling Pattern. (Github)

You provide the iterator that you already have as an Epic. Introduce a Worker actor, which will perform the call & some logic. If you spawn N workers then, there will be at most N lines being processed concurrently (and the processing pipeline may involve multiple steps). This way you can ensure that you are not overloading the executors, and the timeouts shouldn't happen.

ponythewhite
  • 617
  • 3
  • 8
  • 1
    This code is for simple tool that should make a lot of http request. I thought about using another http client, but then I make the changes to use the 'Work Pulling Pattern' and it works great. – roterl Nov 20 '14 at 16:06
  • Happy to hear that! I'm also using a modified WPP. – ponythewhite Nov 24 '14 at 09:15
  • Could you please demonstrate your statement: `... but there are no executor threads left for all the spawned actors to do their work`. How do you know that? Thanks – Kevin Meredith Jun 14 '16 at 21:35