0

How do I wire up a Source[String, Unit] to a streaming actor?

I think a modified version of StreamingActor from https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33 will work well, but I'm having difficulty connecting the pieces.

Given source: Source[String, Unit] and ctx: RequestContext, I think the the modified StreamingActor should wire up with actorRefFactory.actorOf(fromSource(source, ctx)).

For reference, the gist above:

import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}

object StreamingActor {

  // helper methods

  def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
    Props(new StreamingActor(iterable, ctx))
  }

  // initial message sent by StreamingActor to itself
  private case object FirstChunk

  // confirmation that given chunk was sent to client
  private case object ChunkAck

}

class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {

  import StreamingActor._

  def actorRefFactory = context

  val chunkIterator: Iterator[HttpData] = chunks.iterator

  self ! FirstChunk

  def receive = {

    // send first chunk to client
    case FirstChunk if chunkIterator.hasNext =>
      val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
      ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)

    // data stream is empty. Respond with Content-Length: 0 and stop
    case FirstChunk =>
      ctx.responder ! HttpResponse(entity = Empty)
      context.stop(self)

    // send next chunk to client  
    case ChunkAck if chunkIterator.hasNext =>
      val nextChunk = MessageChunk(chunkIterator.next())
      ctx.responder ! nextChunk.withAck(ChunkAck)

    // all chunks were sent. stop.  
    case ChunkAck =>
      ctx.responder ! ChunkedMessageEnd
      context.stop(self)

    //   
    case x => unhandled(x)
  }

}
ic3b3rg
  • 14,629
  • 4
  • 30
  • 53
  • Is using a Actor to send multiple HttpResponse values a requirement? I think you would be much better served by sending 1 HttpResponse with HttpEntity.Chunked, see http://stackoverflow.com/questions/33123280/akka-http-send-continuous-chunked-http-response-stream – Ramón J Romero y Vigil Nov 05 '15 at 13:17

1 Answers1

2

I think your use of a StreamingActor over-complicates the underlying problem you are trying to solve. Further, the StreamingActor in the question will produce multiple HttpResponse values, 1 for each Chunk, for a single HttpRequest. This is inefficient because you can simply return 1 HttpReponse with an HttpEntity.Chunked as the Entity for your data stream source.

General Concurrency Design

Actors are for state, e.g. maintaining a running counter between connections. And even then an Agent covers a lot of ground with the additional benefit of type checking (unlike Actor.receive which turns the dead letter mailbox into your only type checker at runtime).

Concurrent computation, not state, should be handled with (in order):

  1. Futures as a first consideration: composable, compile time type safe checking, and best choice for most cases.

  2. akka Streams : composable, compile time type safe checking, and very useful but there is a lot of overhead resulting from the convenient back-pressure functionality. Steams also are how HttpResponse entities are formed, as demonstrated below.

Streaming CSV Files

You're underlying question is how to stream a csv file to an http client using Streams. You can begin by creating a data Source and embedding it within an HttpResponse:

def lines() = scala.io.Source.fromFile("DataFile.csv").getLines()

import akka.util.ByteString
import akka.http.model.HttpEntity

def chunkSource : Source[HttpEntity.ChunkStreamPart, Unit] = 
  akka.stream.scaladsl.Source(lines)
                      .map(ByteString.apply)
                      .map(HttpEntity.ChunkStreamPart.apply)

def httpFileResponse = 
  HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, chunkSource))

You can then provide this response for any requests:

val fileRequestHandler = {
  case HttpRequest(GET, Uri.Path("/csvFile"), _, _, _) => httpFileResponse
}   

Then embed the fileRequestHandler into your server routing logic.

Community
  • 1
  • 1
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125