33

akka-http represents a file uploaded using multipart/form-data encoding as Source[ByteString, Any]. I need to unmarshal it using Java library that expects an InputStream.

How Source[ByteString, Any] can be turned into an InputStream?

kostya
  • 9,221
  • 1
  • 29
  • 36
  • I did not downvote, but my guess is there doesn't seem to be any work behind this question. You found a roadblock. What have you tried to get around it? It's nice to see someone put in some time and research first, exhaust those options before coming here and letting us know what you have already tried. – cmbaxter May 28 '15 at 12:18
  • 1
    FYI, there seems to some discussion around this in akka user list at https://groups.google.com/forum/#!topic/akka-user/4WvOrFtewQY also an open case regarding the same https://github.com/akka/akka/issues/17338 – Biswanath May 28 '15 at 12:49
  • 4
    I am curious about downvotes too. I think the question is valid given that the solution is neither provided by the library out of the box nor described in the documentation. The answer was helpful as well and will hopefully help other people. – kostya May 29 '15 at 01:12

3 Answers3

25

As of version 2.x you achieve this with the following code:

import akka.stream.scaladsl.StreamConverters
...
val inputStream: InputStream = entity.dataBytes
        .runWith(
           StreamConverters.asInputStream(FiniteDuration(3, TimeUnit.SECONDS))
        )

See: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.1/scala/migration-guide-1.0-2.x-scala.html

Note: was broken in version 2.0.2 and fixed in 2.4.2

Bennie Krijger
  • 595
  • 7
  • 10
  • it is broken however as of 2.0.2: https://github.com/akka/akka/issues/19392 and the solution from the accepted answer can dead lock due to fusing (an unlikely scenario happened! ;) – kostya Jan 18 '16 at 16:23
  • @kostya thanks for pointing out this problem, however it seems like the syntax remains the same after the fix: https://github.com/akka/akka/pull/19575 – Bennie Krijger Jan 25 '16 at 09:24
  • I agree, once the bug is fixed this will be the preferred solution. Especially given that the approach from the currently accepted answer doesn't always work with 2.0 – kostya Jan 25 '16 at 15:24
  • It is still broken in 2.0.3! The fix seems to be merged to master (akka 2.4) but I haven't tested it. – kostya Feb 17 '16 at 05:11
  • If you wanted a Array[Byte] as a result output, would you use InputStream for that or is there another way? – lisak Jul 12 '17 at 13:30
  • 2
    @lisak in that case you could do data.runFold(ByteString.empty)(_ ++ _).toArray where data is of type Source[ByteString, Any] – Bennie Krijger Jul 13 '17 at 08:15
  • 3
    will it load entire dataset into memory on materializing? – maks Jan 25 '19 at 16:56
7

You could try using an OutputStreamSink that writes to a PipedOutputStream and feed that into a PipedInputStream that your other code uses as its input stream. It's a little rough of an idea but it could work. The code would look like this:

import akka.util.ByteString
import akka.stream.scaladsl.Source
import java.io.PipedInputStream
import java.io.PipedOutputStream
import akka.stream.io.OutputStreamSink
import java.io.BufferedReader
import java.io.InputStreamReader
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer

object PipedStream extends App{
  implicit val system = ActorSystem("flowtest")
  implicit val mater = ActorFlowMaterializer()

  val lines = for(i <- 1 to 100) yield ByteString(s"This is line $i\n")
  val source = Source(lines)

  val pipedIn = new PipedInputStream()
  val pipedOut = new PipedOutputStream(pipedIn)      
  val flow = source.to(OutputStreamSink(() => pipedOut))
  flow.run()

  val reader = new BufferedReader(new InputStreamReader(pipedIn))
  var line:String = reader.readLine
  while(line != null){
    println(s"Reader received line: $line")
    line = reader.readLine
  }           
}
cmbaxter
  • 35,283
  • 4
  • 86
  • 95
  • BTW the `PipedOutputStream` javadocs mention that attempting to use both `PipedOutputStream` and `PipedInputStream` from a single thread is not recommended as it may deadlock the thread. Is there any way in akka to ensure that writing and reading happens in different threads? – kostya May 29 '15 at 01:16
  • I think it's unlikely that you'd end up on the same thread. The HTTP system is using an `ExecutionContext` to serve the request that has the multipart form you want to read. This request is being served off one thread from that EC. If you start up another `RunnableFlow` using the same EC you will get another thread so no deadlock there. If you were really worried then you could have your flow that reads the form data use a completely different `ExecutionContext`. – cmbaxter May 29 '15 at 11:51
  • Thank you for the explanation, though I am a bit worried about word "unlikely" in your answer ;) It either can happen (and will) or not. – kostya Jun 03 '15 at 03:20
  • 2
    I say "unlikely" because it setup properly it won't happen. But for instance, say you are testing, and you are using the CallingThreadDispatcher for everything then it will get deadlocked. – cmbaxter Jun 03 '15 at 12:00
2

You could extract an interator from ByteString and then get the InputStream. Something like this (pseudocode):

source.map { data: ByteString =>
  data.iterator.asInputStream
}

Update

A more elaborated sample starting with a Multipart.FormData

def isSourceFromFormData(formData: Multipart.FormData): Source[InputStream, Any] = 
 formData.parts.map { part => 
   part.entity.dataBytes
   .map(_.iterator.asInputStream)
}.flatten(FlattenStrategy.concat)
  • The problem now is how to convert multiple input streams into a single inputstream – kostya Sep 25 '15 at 13:02
  • You will need to play with source of sources here. As you mentioned in the question, you're uploading some binaries as parts in a multipart form. So, all you have to do is to convert the ByteString source of sources (1 source per file) in an unique Source[ByteString]. This operation in akka streams should be carried out for a flatten operation. Something like this (pseudocode): {source of sources}.flatten(FlattenStrategy.concat). Hope this helps. – Juan José Vázquez Delgado Sep 26 '15 at 16:52
  • It is not that simple. Each file is represented by Source[ByteString] and there could be more than one ByteString per Source. – kostya Sep 27 '15 at 15:20
  • Maybe you'll need to flatten more than once?. Anyway, I reckon you have all the information to do this task properly. – Juan José Vázquez Delgado Sep 28 '15 at 17:10
  • The reason I was asking you these questions is that I don't think your suggestion will work. You cannot flatten Source[ByteString] into InputStream. Try to implement your suggestion first and you will see yourself what the difficulties are. – kostya Sep 29 '15 at 10:09
  • I've added an example from Multipart.FormData to Source[InputStream, Any]. – Juan José Vázquez Delgado Sep 30 '15 at 13:10
  • Correct me if I am wrong but this code loads the entire uploaded body part in memory before converting it to an InputStream. This is ok for small files but will become a problem if big files are uploaded. – kostya Sep 30 '15 at 14:13
  • BTW, using `BodyPartEntity.toStrict` is probably easier in cases when it is acceptable to put the whole request in memory. – kostya Sep 30 '15 at 14:26
  • Yes of course. My sample came from a use case where I needed the whole part in memory (every part is a zip file). I changed the code removing the folding part. Is it getting closer to your needs?. – Juan José Vázquez Delgado Sep 30 '15 at 16:06
  • This is still not usable. Even for a single body part your code can return a source that produces multiple InputStreams. However Java API expects one InputStream per file. The merging of InputStreams into a single one is a problem that you cannot solve without introducing threads/actors. BTW you don't need to a load zip file in memory in order to start decoding data from it, have a look at ZipInputStream – kostya Sep 30 '15 at 16:51
  • Yes, I think you're right about this sample generating multiple InputStreams. Thanks. – Juan José Vázquez Delgado Oct 01 '15 at 12:57
  • @ViktorKlang, this might actually work. But how to create `Enumeration[InputStream]` from `Source[InputStream, Any]`? – kostya Feb 01 '16 at 03:35
  • 1
    Given your situation I think you should go with source.runWith(StreamConverters.asInputStream) and feed that input stream into your Java API. – Viktor Klang Feb 01 '16 at 12:31
  • I will, once a version with https://github.com/akka/akka/pull/19660 is released ;) – kostya Feb 01 '16 at 15:20