15

The aim is to stream data from a database, perform some computation on this chunk of data(this computation returns a Future of some case class) and send this data as chunked response to the user. Currently I am able to stream data and send the response without performing any computation. However, I am unable to perform this computation and then stream the result.

This is the route I have implemented.

def streamingDB1 =
path("streaming-db1") {
  get {
    val src = Source.fromPublisher(db.stream(getRds))
    complete(src)
  }
}

The function getRds returns the rows of a table mapped into a case class(Using slick). Now consider the function compute which takes each row as an input and returns a Future of another case class. Something like

def compute(x: Tweet) : Future[TweetNew] = ?

How can I implement this function on variable src and send the chunked response(as a stream) of this computation to the user.

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
user3294786
  • 177
  • 2
  • 10

2 Answers2

7

You could transform the source using mapAsync:

val src =
  Source.fromPublisher(db.stream(getRds))
        .mapAsync(parallelism = 3)(compute)

complete(src)

Adjust the level of parallelism as needed.


Note that you might need to configure a few settings as mentioned in the Slick documentation:

Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

So if you're using PostgreSQL, for example, then your Source might look something like the following:

val src =
  Source.fromPublisher(
    db.stream(
      getRds.withStatementParameters(
        rsType = ResultSetType.ForwardOnly,
        rsConcurrency = ResultSetConcurrency.ReadOnly,
        fetchSize = 10
      ).transactionally
    )
  ).mapAsync(parallelism = 3)(compute)
Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
  • This does not work. I run the curl command to hit the endpoint. However the connection gets closed. – user3294786 Dec 27 '17 at 12:51
  • 1
    @user3294786 sounds like it's not properly waiting for data before closing – stan Mar 20 '18 at 20:25
  • @StanislavPalatnik likely is onto something here; I suggest adding a .log() stage to see the elements are actually being sent as expected (and not just a completion), make sure to set akka.loglevel = DEBUG as well – Konrad 'ktoso' Malawski Mar 22 '18 at 00:42
  • @user3294786: You marked this as the answer. So has it worked for you now? – stan Mar 29 '18 at 01:03
  • @StanislavPalatnik Yes it worked. I tried this on a different dataset and it worked perfectly. Need to debug why it was failing earlier. – user3294786 Mar 30 '18 at 13:37
1

You need to have a way to marshall TweetNew and also if you send a chunk with length 0 client may close connection.

This code works with curl:

case class TweetNew(str: String)

def compute(string: String) : Future[TweetNew] = Future {
  TweetNew(string)
}

val route = path("hello") {
  get {
    val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
      .mapAsync(2)(compute)
      .map(tweet => ByteString(tweet.str + "\n"))
    complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
  }
}
  • "f you send a chunk with length 0 client may close connection." -- I believe that Akka is clever enough to filter these out of the stream for you, as they are not representable as HTTP Chunks. – Rich Mar 27 '18 at 13:30