3

This question is based on a pet project that I did and this SO thread. Inside a Akka HTTP route definition, I start a long-running process, and naturally I want to do that without blocking the user. I'm able to achieve this with the code snippet below:

blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 16
  }
  throughput = 1
}

complete {
  Try(new URL(url)) match {
    case scala.util.Success(u) => {
      val src = Source.fromIterator(() => parseMovies(u).iterator)

      src
        .via(findMovieByTitleAndYear)
        .via(persistMovies)
        .toMat(Sink.fold(Future(0))((acc, elem) => Applicative[Future].map2(acc, elem)(_ + _)))(Keep.right)
        // run the whole graph on a separate dispatcher
        .withAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher"))
        .run.flatten
        .onComplete {
          _ match {
            case scala.util.Success(n) => logger.info(s"Created $n movies")
            case Failure(t) => logger.error(t, "Failed to process movies")
          }
        }

      Accepted
    }
    case Failure(t) => logger.error(t, "Bad URL"); BadRequest -> "Bad URL"
  }
}

What's the problem then if I've already solved it? The problem is that I'm not sure how to set a timeout. The execution of the graph creates a Future that executes until complete on the dedicated blocking-io-dispatcher. If I add a Await call, the code blocks. Is there a way to put a timeout?

Community
  • 1
  • 1
Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219

1 Answers1

3

completionTimeout stage should help here. Example below:

src
    .completionTimeout(5.seconds)
    ...
    .run.flatten
    .onComplete {
        case scala.util.Success(n) => logger.info(s"Created $n movies")
        case Failure(t: TimeoutException) => logger.error(t, "Timed out")
        case Failure(t) => logger.error(t, "Failed to process movies")
    }

Docs reference here.

Stefano Bonetti
  • 8,973
  • 1
  • 25
  • 44
  • 2
    Thanks you; but your code doesn't compile as shown. `completionTimeout` has to be applied before attempting to materialize the graph (`toMat`). I've accepted your answer as is almost correct. – Abhijit Sarkar Jan 16 '17 at 02:17