I'm trying to use the Alpakka S3 connector to do the following:
- Download a number of files from AWS S3
- Stream the downloaded files via the Alpakka Zip Archive Flow
- Upload the Zip stream back to S3 Sink
The code I used is something like this:
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("my-s3-bucket", "archive.zip")
val sourceList = (1 to 10).map(i => S3.download("my-s3-bucket", s"random$i.png").map {
case Some((s, m)) => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), s)
})
val source = Source.combine(sourceList.head, sourceList.tail.head, sourceList.tail.tail: _*)(Merge(_))
source
.via(Archive.zip())
.to(s3Sink)
.run()
However, this results in the following error:
Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.
I suspect this is due to the fact that the underlying Akka Http used by the S3 connector expects every download response to be consumed before moving to the next one, but I wasn't able to handle this in a reasonable way without introducing waits/delays.
I tried using a queue with bufferSize = 1
, but that didn't work either.
I'm fairly new to Akka and Akka Streams.