9

I'm building a micro-service using Play Framework 2.3.x using Scala (I'm a beginner in both) but I can't figure out a way to stream my request body.

Here is the problem:

I need an endpoint /transform where I can receive a huge TSV file that I will parse and render in another format: simple transformation. The problem is that every single command in my controller is ran "too late". It waits to receive the full file before starting the code.

Example:

  def transform = Action.async {
    Future {
      Logger.info("Too late")
      Ok("A response")
    }
  }

I want to be able to read line-by-line the request body during its upload and process already the request without having to wait for the file to be received completely.

Any hint would be welcome.

Cecile
  • 1,553
  • 1
  • 15
  • 26

1 Answers1

15

This answer applies to Play 2.5.x and higher since it uses the Akka streams API that replaced Play's Iteratee-based streaming in that version.

Basically, you can create a body parser that returns a Source[T] that you can pass to Ok.chunked(...). One way to do this is to use Accumulator.source[T] in the body parser. For example, an action that just returned data sent to it verbatim might look like this:

def verbatimBodyParser: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
  // Return the source directly. We need to return
  // an Accumulator[Either[Result, T]], so if we were
  // handling any errors we could map to something like
  // a Left(BadRequest("error")). Since we're not
  // we just wrap the source in a Right(...)
  Accumulator.source[ByteString]
    .map(Right.apply)
}

def stream = Action(verbatimBodyParser) { implicit request =>
  Ok.chunked(request.body)
}

If you want to do something like transform a TSV file you can use a Flow to transform the source, e.g:

val tsvToCsv: BodyParser[Source[ByteString, _]] = BodyParser { req =>

  val transformFlow: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString]
    // Chunk incoming bytes by newlines, truncating them if the lines
    // are longer than 1000 bytes...
    .via(Framing.delimiter(ByteString("\n"), 1000, allowTruncation = true))
    // Replace tabs by commas. This is just a silly example and
    // you could obviously do something more clever here...
    .map(s => ByteString(s.utf8String.split('\t').mkString(",") + "\n"))

  Accumulator.source[ByteString]
    .map(_.via(transformFlow))
    .map(Right.apply)
}

def convert = Action(tsvToCsv) { implicit request =>
  Ok.chunked(request.body).as("text/csv")
}

There may be more inspiration in the Directing the Body Elsewhere section of the Play docs.

Mikesname
  • 8,781
  • 2
  • 44
  • 57
  • Thanks! I already figured out it is an issue to make the things I want working on Play 2.3.x. But I will probably try this just for the fun on a 2.5.x. – Cecile Jul 20 '16 at 19:26