1

I am reading .gz file from AWS S3 bucket using Akka streaming, The application reads chunk of data and process it.

The following code is working fine when I read file from beginning but if I start file from particular offset then Decompression fails and throws ParsingException.

Code:

object StreamApp extends App {

    // Run 1
    //startStreaming(0)
    // Run 2
    startStreaming(33339)

    def startStreaming(pointer: Long): Unit = {
        println("Stream App Starting")

        implicit val system = ActorSystem()
        val materializerSettings = ActorMaterializerSettings(system)
        implicit val materializer = ActorMaterializer(materializerSettings)
        implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
        implicit val timeout: Timeout = Timeout(1 seconds)
        
        val accessKey = "aws-access-key"
        val secretAccessKey = "aws-secret-access-key"

        val awsCredentials = new AWSStaticCredentialsProvider(
            new BasicAWSCredentials(accessKey, secretAccessKey)
        )
        
        val s3Region: String = "s3-region-name"
        val s3Bucket: String = "s3-bucket-name"
        val s3DataFile: String = "s3-object-path.gz"
        
        val settings = new S3Settings(MemoryBufferType, None, awsCredentials, s3Region, false)

        val s3Client = new S3Client(settings)(system, materializer)

        val currentOffset: Long = pointer
        val source: Source[ByteString, NotUsed] =
            s3Client
                .download(s3Bucket, s3DataFile, ByteRange.fromOffset(currentOffset))

        val flowDecompress: Flow[ByteString, ByteString, NotUsed] =
            Flow[ByteString].via(
                Compression.gunzip()
            )

        val flowToString: Flow[ByteString, String, NotUsed] =
            Flow[ByteString].map(_.utf8String)

        val sink: Sink[String, Future[Done]] = Sink.foreach(println)

        val (killSwitch, graph) =
            source
                .via(flowDecompress)
                .via(flowToString)
                .viaMat(KillSwitches.single)(Keep.right)
                .toMat(sink)(Keep.both)
                .run()

        graph.onComplete {
            case Success(_) =>
                println("Stream App >> File Data Extractor >> Stream completed successfully")
                killSwitch.shutdown()
        }
    }
}

The exception as follows:

[ERROR] [02/26/2018 12:13:10.682] [default-akka.actor.default-dispatcher-2] [akka.dispatch.Dispatcher] Failure(akka.stream.impl.io.ByteStringParser$ParsingException: Parsing failed in step ReadHeaders) (of class scala.util.Failure)
scala.MatchError: Failure(akka.stream.impl.io.ByteStringParser$ParsingException: Parsing failed in step ReadHeaders) (of class scala.util.Failure)
    at StreamApp$$anonfun$startStreaming$1.apply(StreamApp.scala:71)
    at StreamApp$$anonfun$startStreaming$1.apply(StreamApp.scala:71)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:43)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I debug the code and following is my observation

Execution on this line, the readByte() returns 3 and hence fails to understand the input is in .gz format

I am not able to decompress the .gz file's data chunk, I want guideline about this problem, if possible then please provide the pointers and correct me if I am wrong.

Version:

Akka Version = "2.5.9"

Akka Http Version = "10.0.11"

Alpakka Version = "0.14"

Community
  • 1
  • 1
Nikhil
  • 213
  • 4
  • 18

1 Answers1

0

From the docs: Compression.gunzip() will create Flow that decompresses a gzip-compressed stream of data.

You can use the file utility on Linux or MacOS to see what algorithm was used to compress the file, in my case the file was compressed using zlib so Compression.inflate() did the trick.

See also this question for a good explanation on how zlib, zip and gzip are related.

Diana
  • 690
  • 6
  • 14