Moin, I'd like to ask for solution. I want to zip content of folder on GCP bucket. I'm using Alpakka 5.0.0 to do that.
I've written following:
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.Materializer
import akka.http.scaladsl.model.ContentType
import akka.http.scaladsl.model.MediaTypes.`application/zip`
import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.alpakka.file.scaladsl.Archive
import akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage
import akka.stream.scaladsl.Source
import java.util.UUID
import scala.concurrent.ExecutionContext
object GCPZipper extends App {
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "zip-streamer")
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.executionContext
private val bucket = "some-bucket-on-gcp"
private val folder = "some-folder-on-bucket"
private val contentType = ContentType(`application/zip`)
val uuid = UUID.randomUUID().toString
GCStorage.listBucket(bucket, Some(s"$folder/"))
.log("zip-streamer - after list")
.flatMapMerge(1, found => GCStorage.download(found.bucket, found.name).zip(Source.single(found)))
.log("zip-streamer - after download")
.collect({ case (Some(found), metadata) => (found, metadata) }) // to get rid of Option
.log("zip-streamer - after collect")
.map { case (found, metadata) => (ArchiveMetadata(metadata.name), found) }
.log("zip-streamer - after archive metadata")
.via(Archive.zip())
.runWith(GCStorage.resumableUpload(bucket, s"${folder}_${uuid}_result.zip", contentType))
}
The problem is that after downloading, zipping and uploading of the 1st file in the bucket, the stream starts to process 2nd one, but it ends up with exception:
21:55:54.854 [zip-streamer-akka.actor.default-dispatcher-6] ERROR akka.stream.Materializer - [zip-streamer - after zip] Upstream failed.
java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /storage/v1/b/some-bucket-on-gcp/o/643ffceb594787623f296844%2Fcog-hillshade-dtm-result.tiff Empty -> 200 OK Default(47874625 bytes)
at akka.http.impl.engine.client.pool.SlotState$WaitingForResponseEntitySubscription.onTimeout(SlotState.scala:313)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$onTimeout$1(NewHostConnectionPool.scala:187)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$event0$1(NewHostConnectionPool.scala:189)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.runOneTransition$1(NewHostConnectionPool.scala:277)
It looks like that 2nd request was already started before the 1st fully finished.
This should not be the case because the requests in implementation are fired within Source.lazyFuture
as:
// taken from DCStorageStram.scala
private def makeRequestSource[T: FromResponseUnmarshaller](request: Future[HttpRequest]): Source[T, NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
implicit val settings = resolveSettings(mat, attr)
Source.lazyFuture { () =>
request.flatMap { request =>
GoogleHttp()(mat.system).singleAuthenticatedRequest[T](request)
}(ExecutionContexts.parasitic)
}
}
.mapMaterializedValue(_ => NotUsed)
What I'm missing, doing wrong etc. My only goal is to stream the files one after another.
Thanks a lot for help.