4

Moin, I'd like to ask for solution. I want to zip content of folder on GCP bucket. I'm using Alpakka 5.0.0 to do that.

I've written following:

import akka.actor.typed.ActorSystem  
import akka.actor.typed.scaladsl.Behaviors  
import akka.stream.Materializer  
import akka.http.scaladsl.model.ContentType  
import akka.http.scaladsl.model.MediaTypes.`application/zip`  
import akka.stream.alpakka.file.ArchiveMetadata  
import akka.stream.alpakka.file.scaladsl.Archive  
import akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage  
import akka.stream.scaladsl.Source  
  
import java.util.UUID  
import scala.concurrent.ExecutionContext  
  
object GCPZipper extends App {  
  
  implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "zip-streamer")  
  implicit val materializer: Materializer = Materializer(system)  
  implicit val executionContext: ExecutionContext = system.executionContext  
  
  private val bucket = "some-bucket-on-gcp"  
  private val folder = "some-folder-on-bucket"  
  private val contentType = ContentType(`application/zip`)  
  
  val uuid = UUID.randomUUID().toString  
  
  GCStorage.listBucket(bucket, Some(s"$folder/"))  
    .log("zip-streamer - after list")  
    .flatMapMerge(1, found => GCStorage.download(found.bucket, found.name).zip(Source.single(found)))  
    .log("zip-streamer - after download")  
    .collect({ case (Some(found), metadata) => (found, metadata) }) // to get rid of Option  
    .log("zip-streamer - after collect")  
    .map { case (found, metadata) => (ArchiveMetadata(metadata.name), found) }  
    .log("zip-streamer - after archive metadata")  
    .via(Archive.zip())  
    .runWith(GCStorage.resumableUpload(bucket, s"${folder}_${uuid}_result.zip", contentType))    
  
}

The problem is that after downloading, zipping and uploading of the 1st file in the bucket, the stream starts to process 2nd one, but it ends up with exception:

21:55:54.854 [zip-streamer-akka.actor.default-dispatcher-6] ERROR akka.stream.Materializer - [zip-streamer - after zip] Upstream failed.
java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /storage/v1/b/some-bucket-on-gcp/o/643ffceb594787623f296844%2Fcog-hillshade-dtm-result.tiff Empty -> 200 OK Default(47874625 bytes)
    at akka.http.impl.engine.client.pool.SlotState$WaitingForResponseEntitySubscription.onTimeout(SlotState.scala:313)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$onTimeout$1(NewHostConnectionPool.scala:187)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$event0$1(NewHostConnectionPool.scala:189)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.runOneTransition$1(NewHostConnectionPool.scala:277)

It looks like that 2nd request was already started before the 1st fully finished. This should not be the case because the requests in implementation are fired within Source.lazyFuture as:

// taken from DCStorageStram.scala

private def makeRequestSource[T: FromResponseUnmarshaller](request: Future[HttpRequest]): Source[T, NotUsed] =  
  Source  
    .fromMaterializer { (mat, attr) =>  
      implicit val settings = resolveSettings(mat, attr)  
      Source.lazyFuture { () =>  
        request.flatMap { request =>  
          GoogleHttp()(mat.system).singleAuthenticatedRequest[T](request)  
        }(ExecutionContexts.parasitic)  
      }  
    }  
    .mapMaterializedValue(_ => NotUsed)

What I'm missing, doing wrong etc. My only goal is to stream the files one after another.

Thanks a lot for help.

Teliatko
  • 1,521
  • 1
  • 14
  • 15

0 Answers0