5

I'm trying to use the Alpakka S3 connector to do the following:

  • Download a number of files from AWS S3
  • Stream the downloaded files via the Alpakka Zip Archive Flow
  • Upload the Zip stream back to S3 Sink

The code I used is something like this:

val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("my-s3-bucket", "archive.zip")

val sourceList = (1 to 10).map(i => S3.download("my-s3-bucket", s"random$i.png").map {
    case Some((s, m)) => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), s)
})
val source = Source.combine(sourceList.head, sourceList.tail.head, sourceList.tail.tail: _*)(Merge(_))

source
    .via(Archive.zip())
    .to(s3Sink)
    .run()

However, this results in the following error:

Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.

I suspect this is due to the fact that the underlying Akka Http used by the S3 connector expects every download response to be consumed before moving to the next one, but I wasn't able to handle this in a reasonable way without introducing waits/delays. I tried using a queue with bufferSize = 1, but that didn't work either.

I'm fairly new to Akka and Akka Streams.

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
rogueai
  • 131
  • 2
  • 7
  • We're in the process of figuring this one out. I'll provide a definitive answer when we do. It's probably due to internal buffering of elements, combined with limited connection pool size. You're using the same pool under the hood for uploads and downloads, so in case of contention a download may start before you have the capacity to upload it again. – László van den Hoek Feb 06 '20 at 11:04
  • Eventually, I ended up using the InputStream from a plain AWS SDK GetObjectRequest, until I'll figure out a way of doing that with Alpakka/Akka Http – rogueai Apr 01 '20 at 10:00
  • @LászlóvandenHoek was there any progress on that one? I'm also facing this issue – Adam Szmyd Oct 27 '21 at 09:57
  • @AdamSzmyd we ended up not needing to do any S3-to-S3 streaming, so I don't have an exact answer. I do recall my initial assertion that it was about internal buffering was correct, so you have to take extra care to not eagerly subscribe to the outer `Source` (representing the request) before you're ready to consume the inner one (the response bytes). The `Source.combine` in the original question is too eager. It would have been better to `flatMapConcat` over the individual file names. – László van den Hoek Oct 27 '21 at 11:24
  • @AdamSzmyd I added an answer of my own. – László van den Hoek Oct 27 '21 at 13:21

1 Answers1

1

Let's break down what's happening here by dissecting the return type of S3.download: Source[Optional[JPair[Source[ByteString, NotUsed], ObjectMetadata]], NotUsed].

The outer Source represents the outstanding request. The Optional is empty if the file isn't found in the bucket. If it is present, it contains a Pair of another Source which represents the byte contents of the file, and the ObjectMetadata which represent the metadata of the file you're downloading.

What's counter-intuitive is that Source is usually represented as a cold, stateless, shareable part of a blueprint of some streaming action, only springing to life once it is materialized. For the outer Source, this is the case. The inner Source, however, is uncharacteristically instantly "hot". Once the outer Source is materialized and emits an item, that item represents an open HTTP connection that you're supposed to start consuming within (by default) 1 second, or else the Response entity was not subscribed error is raised.

In the original question, Source.combine is called with the Merge(_) strategy, which causes parallel materialization. Archive.zip will handle the files sequentially, but if fully consuming the first Source[ByteString] it receives takes longer than 1 second, the second request will time out before its time comes.

One surefire way of making sure this doesn't happen is by consuming the whole inner Source before handing it over to the next item in the stage. Consider:

Source(1 to 10)
  .flatMapMerge(4, i => S3.download("my-s3-bucket", s"random$i.png")
    .log("started file download")
    .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
    .flatMapConcat {
      case Some((s, m)) =>
        // for demo purposes, make sure individual downloads take >1 second
        s.delay(2.seconds, DelayOverflowStrategy.backpressure)
          // read entire file contents into a single ByteString
          .reduce(_ ++ _)
          .map(bs => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), Source.single(bs)))
    })
  .log("completed file download")
  .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
  .via(Archive.zip())
  .to(s3Sink)
  .run()

This (tested!) code downloads up to 4 files concurrently (the first parameter to flatMapMerge). Note how the reduce step reads the entire response in memory before passing it on to Archive.zip(). This is not ideal but it might be acceptable for small files.

László van den Hoek
  • 3,955
  • 1
  • 23
  • 28