11

I am trying to create a serverless processor for my chron job, In this job I receive a zipped file in my S3 bucket from one of my clients, file is around 50MB in size but once you unzip it, it becomes 1.5GB in size and there's a hard limit to the space available on AWS Lambda which is 500MB due to which I cannot download this file from S3 bucket and unzip it on my Lambda, I was successfully able to unzip my file and stream the content line by line from S3 using funzip in unix script.

for x in $files ; do echo -n "$x: " ; timeout 5 aws s3 cp $monkeydir/$x - | funzip

My Bucket Name:MonkeyBusiness Key:/Daily/Business/Banana/{current-date} Object:banana.zip

but now since I am trying to achieve same output using boto3, how I can stream the zipped content to sys i/o and unzip the stream save the content in separate files divided by 10000 lines each and upload the chunked files back to S3. Need guidance as I am pretty new to AWS and boto3.

Please let me know if you need more details about the job.

Below given suggested solution is not applicable here because zlib documentation clearly states that said lib is compatible for gzip file format and my question is for zip file format.

import zlib

def stream_gzip_decompress(stream):
    dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
    for chunk in stream:
        rv = dec.decompress(chunk)
        if rv:
            yield rv 
Shek
  • 1,543
  • 6
  • 16
  • 34
  • To unzip and stream chunk by chunk, please look for io.cStringIO or ByteIO, and zlib modules. – mootmoot Sep 05 '17 at 17:01
  • @mootmoot how to stream it from S3 is giving me a heartache. – Shek Sep 06 '17 at 15:40
  • 1
    Try this out and replace the file write with s3.put_object() that say it support streaming https://stackoverflow.com/questions/27035296/python-how-to-gzip-a-large-text-file-without-memoryerror – mootmoot Sep 06 '17 at 15:51
  • @mootmoot that won't apply here, I am using Lambda to stream and decompress zipped file from S3, suggested example is to compress file on ec2, also I need the data line by line, not by block size, cause block might break the line :( – Shek Sep 06 '17 at 16:07
  • 1
    I think you get it wrong, blocked reading data doesn't break the line, it just tread data as stream bytes. And block unzip is more troublesome – mootmoot Sep 06 '17 at 16:23
  • Possible duplicate of [Python unzipping stream of bytes?](https://stackoverflow.com/questions/12571913/python-unzipping-stream-of-bytes) – Phil Booth Sep 21 '17 at 06:55

2 Answers2

8

So I used BytesIO to read the compressed file into a buffer object, then I used zipfile to open the decompressed stream as uncompressed data and I was able to get the datum line by line.

import io
import zipfile
import boto3
import sys

s3 = boto3.resource('s3', 'us-east-1')


def stream_zip_file():
    count = 0
    obj = s3.Object(
        bucket_name='MonkeyBusiness',
        key='/Daily/Business/Banana/{current-date}/banana.zip'
    )
    buffer = io.BytesIO(obj.get()["Body"].read())
    print (buffer)
    z = zipfile.ZipFile(buffer)
    foo2 = z.open(z.infolist()[0])
    print(sys.getsizeof(foo2))
    line_counter = 0
    for _ in foo2:
        line_counter += 1
    print (line_counter)
    z.close()


if __name__ == '__main__':
    stream_zip_file()
Shek
  • 1,543
  • 6
  • 16
  • 34
1

This is not the exact answer. But you can try this out.

First, please adapt the answer that mentioned about gzip file with limited memory, this method allow one to stream file chunk by chunk. And boto3 S3 put_object() and upload_fileobj seems allow streaming.

You need to mix and adapt the above mentioned code with following decompression.

stream = cStringIO.StringIO()
stream.write(s3_data)
stream.seek(0)
blocksize = 1 << 16  #64kb
with gzip.GzipFile(fileobj=stream) as decompressor:
    while True:
        boto3.client.upload_fileobj(decompressor.read(blocksize), "bucket", "key")

I cannot guarantee the above code will works, it is just give you the idea to decompress file and re-uplaod it by chunks. You might even need to pipeline the decompress data to ByteIo and pipeline to upload_fileobj. There is a lot of testing.

if you don't need to decompress the file ASAP, my suggestion is use lambda to put the file into SQS queue. When there is "enough" file, trigger a SPOT instance (that will be pretty cheap) that will read the queue and process the file.

mootmoot
  • 12,845
  • 5
  • 47
  • 44