I am reading .gz file from AWS S3 bucket using Akka streaming, The application reads chunk of data and process it.
The following code is working fine when I read file from beginning but if I start file from particular offset then Decompression fails and throws ParsingException.
Code:
object StreamApp extends App {
// Run 1
//startStreaming(0)
// Run 2
startStreaming(33339)
def startStreaming(pointer: Long): Unit = {
println("Stream App Starting")
implicit val system = ActorSystem()
val materializerSettings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(materializerSettings)
implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
implicit val timeout: Timeout = Timeout(1 seconds)
val accessKey = "aws-access-key"
val secretAccessKey = "aws-secret-access-key"
val awsCredentials = new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKey, secretAccessKey)
)
val s3Region: String = "s3-region-name"
val s3Bucket: String = "s3-bucket-name"
val s3DataFile: String = "s3-object-path.gz"
val settings = new S3Settings(MemoryBufferType, None, awsCredentials, s3Region, false)
val s3Client = new S3Client(settings)(system, materializer)
val currentOffset: Long = pointer
val source: Source[ByteString, NotUsed] =
s3Client
.download(s3Bucket, s3DataFile, ByteRange.fromOffset(currentOffset))
val flowDecompress: Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(
Compression.gunzip()
)
val flowToString: Flow[ByteString, String, NotUsed] =
Flow[ByteString].map(_.utf8String)
val sink: Sink[String, Future[Done]] = Sink.foreach(println)
val (killSwitch, graph) =
source
.via(flowDecompress)
.via(flowToString)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(sink)(Keep.both)
.run()
graph.onComplete {
case Success(_) =>
println("Stream App >> File Data Extractor >> Stream completed successfully")
killSwitch.shutdown()
}
}
}
The exception as follows:
[ERROR] [02/26/2018 12:13:10.682] [default-akka.actor.default-dispatcher-2] [akka.dispatch.Dispatcher] Failure(akka.stream.impl.io.ByteStringParser$ParsingException: Parsing failed in step ReadHeaders) (of class scala.util.Failure)
scala.MatchError: Failure(akka.stream.impl.io.ByteStringParser$ParsingException: Parsing failed in step ReadHeaders) (of class scala.util.Failure)
at StreamApp$$anonfun$startStreaming$1.apply(StreamApp.scala:71)
at StreamApp$$anonfun$startStreaming$1.apply(StreamApp.scala:71)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:43)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I debug the code and following is my observation
Execution on this line, the readByte() returns 3 and hence fails to understand the input is in .gz format
I am not able to decompress the .gz file's data chunk, I want guideline about this problem, if possible then please provide the pointers and correct me if I am wrong.
Version:
Akka Version = "2.5.9"
Akka Http Version = "10.0.11"
Alpakka Version = "0.14"