2

streaming data out of play, is quite easy.
here's a quick example of how I intend to do it (please let me know if i'm doing it wrong):

def getRandomStream = Action { implicit req =>

  import scala.util.Random
  import scala.concurrent.{blocking, ExecutionContext}
  import ExecutionContext.Implicits.global

  def getSomeRandomFutures: List[Future[String]] = {
    for {
      i <- (1 to 10).toList
      r = Random.nextInt(30000)
    } yield Future {
      blocking {
        Thread.sleep(r)
      }
      s"after $r ms. index: $i.\n"
    }
  }

  val enumerator = Concurrent.unicast[Array[Byte]] {
    (channel: Concurrent.Channel[Array[Byte]]) => {
      getSomeRandomFutures.foreach {
        _.onComplete {
          case Success(x: String) => channel.push(x.getBytes("utf-8"))
          case Failure(t) => channel.push(t.getMessage)
        }
      }
      //following future will close the connection
      Future {
        blocking {
          Thread.sleep(30000)
        }
      }.onComplete {
        case Success(_) => channel.eofAndEnd()
        case Failure(t) => channel.end(t)
      }
    }
  }
  new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}

now, if you get served by this action, you'll get something like:

after 1757 ms. index: 10.
after 3772 ms. index: 3.
after 4282 ms. index: 6.
after 4788 ms. index: 8.
after 10842 ms. index: 7.
after 12225 ms. index: 4.
after 14085 ms. index: 9.
after 17110 ms. index: 1.
after 21213 ms. index: 2.
after 21516 ms. index: 5.

where every line is received after the random time has passed.
now, imagine I want to preserve this simple example when streaming data from the server to the client, but I also want to support full streaming of data from the client to the server.

So, lets say i'm implementing a new BodyParser that parses the input into a List[Future[String]]. this means, that now, my Action could look like something like this:

def getParsedStream = Action(myBodyParser) { implicit req =>

  val xs: List[Future[String]] = req.body

  val enumerator = Concurrent.unicast[Array[Byte]] {
    (channel: Concurrent.Channel[Array[Byte]]) => {
      xs.foreach {
        _.onComplete {
          case Success(x: String) => channel.push(x.getBytes("utf-8"))
          case Failure(t) => channel.push(t.getMessage)
        }
      }
      //again, following future will close the connection
      Future.sequence(xs).onComplete {
        case Success(_) => channel.eofAndEnd()
        case Failure(t) => channel.end(t)
      }
    }
  }
  new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}

but this is still not what I wanted to achieve. in this case, I’ll get the body from the request only after the request was finished, and all the data was uploaded to the server. but I want to start serving request as I go. a simple demonstration, would be to echo any received line back to the user, while keeping the connection alive.

so here's my current thoughts:
what if my BodyParser would return an Enumerator[String] instead of List[Future[String]]?
in this case, I could simply do the following:

def getParsedStream = Action(myBodyParser) { implicit req =>
  new Status(200).chunked(req.body).as("text/plain;charset=UTF-8")
}

so now, i'm facing the problem of how to implement such a BodyParser. being more precise as to what exactly I need, well:
I need to receive chunks of data to parse as a string, where every string ends in a newline \n (may contain multiple lines though...). every "chunk of lines" would be processed by some (irrelevant to this question) computation, which would yield a String, or better, a Future[String], since this computation may take some time. the resulted strings of this computation, should be sent to the user as they are ready, much like the random example above. and this should happen simultaneously while more data is being sent.

I have looked into several resources trying to achieve it, but was unsuccessful so far. e.g. scalaQuery play iteratees -> it seems like this guy is doing something similar to what I want to do, but I couldn't translate it into a usable example. (and the differences from play2.0 to play2.2 API doesn't help...)

So, to sum it up: Is this the right approach (considering I don't want to use WebSockets)? and if so, how do I implement such a BodyParser?

EDIT:

I have just stumble upon a note on the play documentation regarding this issue, saying:

Note: It is also possible to achieve the same kind of live communication the other way around by using an infinite HTTP request handled by a custom BodyParser that receives chunks of input data, but that is far more complicated.

so, i'm not giving up, now that I know for sure this is achievable.

gilad hoch
  • 2,846
  • 2
  • 33
  • 57
  • I'm not sure HTTP even supports what you are trying to do, nor does it look like it would be worth doing - everything I've read suggests browsers won't process the response until after they've finished uploading the complete request. Why not break this into separate requests with a Keep-Alive connection? Or use two separate connections, e.g. an AJAX request uploading the data and a forever JSONP iframe bringing data down? – wingedsubmariner Dec 11 '13 at 15:19
  • in my case, the client isn't (usually) a web browser, so i don't realy care if browsers support it or not. and anyway, i'm talking about streaming large amounts of data, which can be processed as we go. i would never want all the data to be stored in my server memory, nor on the disk. and it is important due to the nature of my service, to do it as one HTTP request. – gilad hoch Dec 11 '13 at 15:40
  • This looks like an XY Problem, what is it you are really trying to do? – wingedsubmariner Dec 11 '13 at 17:08
  • i'll describe one feature: client input is subject urls. each line contain 1 url. server output is N-Triple lines. each url the client sends is mapped to multiple n-triples describing the subject. input and output may contains millions of lines. there are other reasons why it has to be a single http connection. (e.g. other services already use it, and i can't break the API...) – gilad hoch Dec 11 '13 at 19:15
  • 2
    You can keep it a single connection, and use multiple requests. That really is the only way that HTTP supports, and it will be much better to do this than to try to hack your own extensions to HTTP. For example, you can RESTfully give errors for each request separately. You can use a single connection, and not only handle these requests, but also mix in the others you mention your API supports. With HTTP 2.0, those requests could be serviced concurrently and answered out of order. Your hack with both a chunked request and response can do none of these things. – wingedsubmariner Dec 11 '13 at 19:41

2 Answers2

0

What you want to do isn't quite possible in Play.

The problem is that Play can't start sending a response until it has completely received the request. So you can either receive the request in its entirety and then send a response, as you have been doing, or you can process requests as you receive them (in a custom BodyParser), but you still can't reply until you've received the request in its entirety (which is what the note in the documentation was alluding to - although you can send a response in a different connection).

To see why, note that an Action is fundamentally a (RequestHeader) => Iteratee[Array[Byte], SimpleResult]. At any time, an Iteratee is in one of three states - Done, Cont, or Error. It can only accept more data if it's in the Cont state, but it can only return a value when it's in the Done state. Since that return value is a SimpleResult (i.e, our response), this means there's a hard cut off from receiving data to sending data.

According to this answer, the HTTP standard does allow a response before the request is complete, but most browsers don't honor the spec, and in any case Play doesn't support it, as explained above.

The simplest way to implement full-duplex communication in Play is with WebSockets, but we've ruled that out. If server resource usage is the main reason for the change, you could try parsing your data with play.api.mvc.BodyParsers.parse.temporaryFile, which will save the data to a temporary file, or play.api.mvc.BodyParsers.parse.rawBuffer, which will overflow to a temporary file if the request is too large.

Otherwise, I can't see a sane way to do this using Play, so you may want to look at using another web server.

Community
  • 1
  • 1
James_pic
  • 3,240
  • 19
  • 24
  • _"...the HTTP standard does allow a response before the request is complete, but most browsers don't honor the spec..."_ since the client is not a browser in my case, that's not a problem. _"and in any case Play doesn't support it."_ - are you sure? I couldn't find anything regarding that. anyway, what I was thinking of, is not letting the `Iteratee` reach it's `Done` state before replying, and somehow use the connection to start streaming data back as it is being parsed. if it is not possible, I would appreciate a reference. thanks. – gilad hoch Feb 02 '14 at 08:52
  • The reason that this is impossible, is that the response is produced by the Iteratee once it's in its Done state. If you look at the API for Iteratee, there's no way to get a result out of it when it's in any other state. States are represented by 3 case classes, Cont, Done and Err, and only Done includes a result. This is the sole and only way that a response can be produced. – James_pic Feb 02 '14 at 12:06
  • Moreover, if you look at the implementation of Play's web server (in the play.core classes), you'll see that it stops processinging data once the Iteratee is in the Done state, so if you were thinking of tricking it somehow (like overriding fold in your iteratee, to call both the Done and Cont callbacks), that won't work either. – James_pic Feb 02 '14 at 12:06
  • since I want to do something "unorthodox", it's fine by me to have a "hacky" solution, and not go "the standard" way. if there's a possibility to somehow get the connection in the `BodyParser` and start sending responses as we go, i'll be OK with it. another "hacky" option, would be to somehow leave the `Iteratee` in the `Cont` mode, so it would still consume incoming data, but to eagerly invoke the action, with an `Enumerator` emitting the parsed chunks that are available as the request body. – gilad hoch Feb 03 '14 at 13:18
  • The fundamental issue is that the *only* way you can provide an `Enumerator` to Play is by having your `Iteratee` return a `SimpleResult`, and the *only* way that this can happen is if your `Iteratee` is in the `Done` state. I did consider a hacky option like the one you're proposing - `Iteratee` signals its state by calling one of three callbacks, so I wondered if it would be possible to break the `Iteratee` contract by calling both the `Done` and `Cont` callbacks. Unfortunately, this wouldn't work either, for the reasons outlined in my previous comment. – James_pic Feb 04 '14 at 10:17
  • Lines 65-72 of https://github.com/playframework/playframework/blob/9206bea8c9c88acdc6786ebb2554f081396e8f6a/framework/src/play/src/main/scala/play/core/server/netty/RequestBodyHandler.scala#L65 demonstrate that Play will stop putting data into the `Iteratee` once it has a `Result`, even if you've somehow tricked it into returning a value from the `Cont` state. If you wanted to try to hack around this limitation, you would, at a bare minimum, have to run a modified version of Play which didn't do this. – James_pic Feb 04 '14 at 10:25
  • OK, I guess it really is not possible. still, it's weird they wrote the note suggesting it is possible in the play documention. anyway, I did not got what I wanted, but it's a waste if no one get's the bounty... ;) – gilad hoch Feb 05 '14 at 07:43
  • The note in the docs is correct, it just doesn't allow for your use case. IIRC, the note is in the section on Comet. Comet allows the server to send a stream of messages to the client. "The other way round" is for the client to send a stream of messages to the server, which is also possible by creatively using BodyParsers. What's not possible in Play, is for both of these things to happen at the same time (short of opening two connections, one for each direction). – James_pic Feb 05 '14 at 13:33
0

"Streaming data in and out simultaneously on a single HTTP connection in play"

I haven't finished reading all of your question, nor the code, but what you're asking to do isn't available in HTTP. That has nothing to do with Play.

When you make a web request, you open a socket to a web server and send "GET /file.html HTTP/1.1\n[optional headers]\n[more headers]\n\n"

You get a response after (and only after) you have completed your request (optionally including a request body as part of the request). When and only when the request and response are finished, in HTTP 1.1 (but not 1.0) you can make a new request on the same socket (in http 1.0 you open a new socket).

It's possible for the response to "hang" ... this is how web chats work. The server just sits there, hanging onto the open socket, not sending a response until someone sends you a message. The persistent connection to the web server eventually provides a response when/if you receive a chat message.

Similarly, the request can "hang." You can start to send your request data to the server, wait a bit, and then complete the request when you receive additional user input. This mechanism provides better performance than continually creating new http requests on each user input. A server can interpret this stream of data as a stream of distinct inputs, even though that wasn't necessarily the initial intention of the HTTP spec.

HTTP does not support a mechanism to receive part of a request, then send part of a response, then receive more of a request. It's just not in the spec. Once you've begun to receive a response, the only way to send additional information to the server is to use another HTTP request. You can use one that's already open in parallel, or you can open a new one, or you can complete the first request/response and issue an additional request on the same socket (in 1.1).

If you must have asynchronous io on a single socket connection, you might want to consider a different protocol other than HTTP.

nairbv
  • 4,045
  • 1
  • 24
  • 26
  • 1
    Technically, you can send another request before a response is received, as in request pipelining – James_pic Feb 02 '14 at 12:05
  • Interesting, I didn't know about request pipelining. According to wikipedia "In all other browsers [other than opera] HTTP pipelining is disabled or not implemented." It also still doesn't seem like this would enable the requested functionality. "limitation of HTTP 1.1 still applies: the server must send its responses in the same order that the requests were received ... connection remains first-in-first-out." The OP "in this case, I’ll get the body from the request only after the request was finished, and all the data was uploaded to the server. but I want to start serving request as I go." – nairbv Feb 03 '14 at 21:49
  • Yes, request pipelining is largely unrelated to what the OP is trying to do. Request pipelining is sort-of the converse of this, in that it lets the client send a request before getting a reply, rather than the server sending a reply before receiving the full request. – James_pic Feb 04 '14 at 09:54