4

I want to read 100K+ files from s3 and zip them into single large file. The individual file size can range between few Kb to 1MB and the final zip file could go easily beyond 3GB. Given AWS Lambda has memory limitation of 3GB and tmp directory storage of 512MB. How would you do that using AWS lambda? I am using .Net Core 3

The code below will fail when zip size go beyond 3Gb

    var zipStream = new MemoryStream();
    using (System.IO.Compression.ZipArchive zip = new ZipArchive(zipStream, ZipArchiveMode.Create, true))
    {
        for(int i =0;i<sourceFils.Count;i++)
        {
            
            var zipItem = zip.CreateEntry("file"+i.ToString()+".pdf");

            using (var entryStream = zipItem.Open())
            {
                var source  = GetFileFromS3(sourceFiles[i]);
                await source.CopyToAsync(entryStream);
            }
        }
    }

     //upload zip file to S3. For brevity Upload code is not included.
    _s3Client.Upload(zipStream); 

Most of the example i have seen for large file processing, are using Node JS and also don't go beyond 3GB. I am looking for C# .Net Core example. Also I am trying to avoid splitting zip into multiple zip files that are less than 3GB each

1>How would you do this using AWS Lambda without splitting zip file? 2>Is there S3 Stream available that would directly read/write from S3?

Camilo Terevinto
  • 31,141
  • 6
  • 88
  • 120
LP13
  • 30,567
  • 53
  • 217
  • 400
  • 1
    If you have a total storage space of 3GB (RAM) + 0.5GB (disk), you obviously cannot go beyond 3.5GB (and probably it's less than this). You need more space -> AWS lambda is not suitable for this task – Camilo Terevinto Nov 24 '20 at 17:29
  • You can use swap space when you run out of memory which will use a temp file in place of memory. It runs slower but will resolve issue. See : https://www.reddit.com/r/aws/comments/b2zijf/swap_space_when_using_aws_linux_based_ami/ – jdweng Nov 24 '20 at 17:35
  • @jdweng i think thats only valid for EC2 instance not for serverless lambda. I could be wrong though. – LP13 Nov 24 '20 at 18:09
  • @LP13 : If the machine has a file system (smart card) then it is applicable. Has nothing to do with being serverless. – jdweng Nov 24 '20 at 18:13
  • Lambda is event driven. If you're doing this once then it would be easiest to spin up an EC2 with a reasonable amount of disk space, run the program, and get rid of the EC2. If you need to do it frequently then create a EC2 AMI with the correct "stuff" and use it to create a temporary EC2 and run it as needed. – stdunbar Nov 24 '20 at 20:14
  • If the use case is that you need to react on file upload you can use AWS Batch Jobs as EventBridge (formerly known as CloudWatch Events) event target https://docs.aws.amazon.com/batch/latest/userguide/batch-cwe-target.html – jimmone Nov 26 '20 at 16:49

2 Answers2

0

Starting today (December 1, 2020), you can allocate up to 10 GB of memory. This may be enough for your purposes, at least for now. https://aws.amazon.com/blogs/aws/new-for-aws-lambda-functions-with-up-to-10-gb-of-memory-and-6-vcpus/

Another option may be to utilize Amazon EFS for storage if you can adapt your code to avoid requiring it all to be in memory. EFS support for Lambda was launched earlier this year. https://aws.amazon.com/blogs/compute/using-amazon-efs-for-aws-lambda-in-your-serverless-applications/

stefansundin
  • 2,826
  • 1
  • 20
  • 28
0

For large uploads you'll need to use the S3 Multipart Upload feature. With a little bit of work you can write a wrapper stream that buffers each minimum upload part (5MB) to a MemoryStream and flushes them as parts.

Here's the code I wrote for this specific case:

using System.IO.Compression;

using Microsoft.Extensions.Logging;

using Amazon.S3;
using Amazon.S3.Model;

using Atelie.Dev.Utils;

var s3Client = new AmazonS3Client();
var req = new InitiateMultipartUploadRequest { BucketName = "...", Key = "...", ContentType = "application/zip" };
var res = await s3Client.InitiateMultipartUploadAsync(req);

var loggerFactory = new LoggerFactory();
var logger = loggerFactory.CreateLogger<S3MultipartUploadStream>();

await using (var upload = new S3MultipartUploadStream(s3Client, res, logger))
{
    // the leaveOpen is mandatory, you don't want to let ZipArchive close our stream
    using (var archive = new ZipArchive(upload, ZipArchiveMode.Create, leaveOpen: true))
    {
        var ze = archive.CreateEntry("...");
        await using var es = ze.Open();
        // write what you need to es
    }

    await upload.CompleteUploadAsync();
}

And here's the code for the S3MultipartUploadStream class (beware, here be dragons):

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Amazon.S3;
using Amazon.S3.Model;

using Microsoft.Extensions.Logging;

namespace Atelie.Dev.Utils;

public class S3MultipartUploadStream : Stream
{
    private const int MinPartSize = 5 * 1024 * 1024; // 5MB
    private const int BufferCapacity = MinPartSize;

    private readonly IAmazonS3 _s3Client;
    private readonly InitiateMultipartUploadResponse _mur;
    private readonly ILogger<S3MultipartUploadStream> _logger;
    private readonly ConcurrentBag<PartETag> _parts = new();
    private readonly ConcurrentBag<Task> _uploadTasks = new();
    private readonly CancellationTokenSource _cts = new();

    private readonly int _concurrency;

    private volatile MemoryStream _mem = new(BufferCapacity);
    private Task? _completedTask;
    private long _realPosition;
    private int _partNumber;

    public S3MultipartUploadStream(IAmazonS3 s3Client, InitiateMultipartUploadResponse mur, ILogger<S3MultipartUploadStream> logger, int concurrency = 4)
    {
        _s3Client = s3Client;
        _mur = mur;
        _logger = logger;
        _concurrency = concurrency;
    }

    /// <summary>
    /// Finishes uploading the pending parts, and marks the multipart upload as completed.
    /// If you don't call this method, the upload will be aborted when calling <see cref="Dispose"/>.
    /// </summary>
    public Task CompleteUploadAsync()
    {
        if (_completedTask != null)
            throw new InvalidOperationException("Upload already finished or aborted");

        return _completedTask = Task.Run(async () =>
        {
            if (_mem.Position > 0)
                await UploadPart(true);

            // waits for all upload tasks to finish
            Task.WaitAll(_uploadTasks.ToArray());

            var req = new CompleteMultipartUploadRequest
            {
                BucketName = _mur.BucketName,
                Key = _mur.Key,
                UploadId = _mur.UploadId,
                PartETags = _parts.ToList(),
            };
            _logger.LogInformation("Finished upload of {Count} parts", req.PartETags.Count);

            await _s3Client.CompleteMultipartUploadAsync(req);
        });
    }

    /// <summary>
    /// Aborts the upload and its parts. Called automatically if <see cref="Dispose"/> is called before
    /// <see cref="CompleteUploadAsync"/>.
    /// </summary>
    private Task AbortUploadAsync()
    {
        return _completedTask ??= Task.Run(async () =>
        {
            // cancels any running upload
            _cts.Cancel();

            var req = new AbortMultipartUploadRequest
            {
                BucketName = _mur.BucketName,
                Key = _mur.Key,
                UploadId = _mur.UploadId,
            };
            _logger.LogInformation("Aborting upload of: {Key}", req.Key);

            await _s3Client.AbortMultipartUploadAsync(req);
        });
    }

    /// <summary>
    /// Uploads the current MemoryStream asynchronously, and replaces the MemoryStream for a pristine one.
    /// </summary>
    private async Task UploadPart(bool isLastPart)
    {
        // if the current position is zero, we don't need to perform the upload
        if (_mem.Position == 0)
            return;

        if (_mem.Length < MinPartSize && !isLastPart)
            throw new InvalidOperationException("You can't upload a part smaller than 5MB, unless it's the last one");

        // if we're over the configured concurrency, wait
        while (_uploadTasks.Count(t => !t.IsCompleted) >= _concurrency)
            await Task.Delay(100, _cts.Token);

        // sets the part number and makes a copy of the stream to upload
        var partNumber = Interlocked.Increment(ref _partNumber);
        var msToWrite = _mem;

        // creates the new stream for the next part
        _mem = new MemoryStream(BufferCapacity);

        // starts the upload task
        _uploadTasks.Add(Task.Run(async () =>
        {
            await using (msToWrite)
            {
                msToWrite.Position = 0;
                var req = new UploadPartRequest
                {
                    BucketName = _mur.BucketName,
                    Key = _mur.Key,
                    InputStream = msToWrite,
                    PartNumber = partNumber,
                    PartSize = msToWrite.Length,
                    UploadId = _mur.UploadId,
                    IsLastPart = isLastPart,
                };
                _logger.LogInformation("Part {Seq}: staring upload of {Size} bytes", req.PartNumber, req.PartSize);

                var res = await _s3Client.UploadPartAsync(req, _cts.Token);
                _parts.Add(new PartETag(req.PartNumber, res.ETag));

                _logger.LogInformation("Part {Seq}: upload finished", res.PartNumber);
            }
        }));
    }

    #region Dispose and DisposeAsync

    protected override void Dispose(bool disposing)
    {
        var endTask = AbortUploadAsync();
        endTask.GetAwaiter().GetResult();
        endTask.Dispose();

        _mem.Dispose();
        _cts.Dispose();

        base.Dispose(disposing);
    }

    public override async ValueTask DisposeAsync()
    {
        GC.SuppressFinalize(this);

        var endTask = AbortUploadAsync();

        await Task.WhenAll(_mem.DisposeAsync().AsTask(), endTask, base.DisposeAsync().AsTask());

        endTask.Dispose();
        _cts.Dispose();

        await base.DisposeAsync();
    }

    #endregion

    #region Write and WriteAsync

    public override void Write(byte[] buffer, int offset, int count)
    {
        while (count > 0)
        {
            var limit = _mem.Capacity - (int) _mem.Position;

            if (limit == 0)
                UploadPart(false).GetAwaiter().GetResult();
            else
            {
                var written = Math.Min(limit, count);
                _mem.Write(buffer, offset, written);
                offset += written;
                count -= written;
                Interlocked.Add(ref _realPosition, written);
            }
        }
    }

    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
    {
        var unwrittenBytes = buffer.Length;
        var offset = 0;
        while (unwrittenBytes > 0)
        {
            var limit = _mem.Capacity - (int) _mem.Position;

            if (limit == 0)
                await UploadPart(false);
            else
            {
                var currentSliceSize = Math.Min(limit, unwrittenBytes);
                await _mem.WriteAsync(buffer.Slice(offset, currentSliceSize), cancellationToken);
                offset += currentSliceSize;
                unwrittenBytes -= currentSliceSize;
                Interlocked.Add(ref _realPosition, currentSliceSize);
            }
        }
    }

    #endregion

    #region Remaining of Stream implementation

    public override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
    public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
    public override void SetLength(long value) => throw new InvalidOperationException();

    public override void Flush() {}

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override long Length => throw new InvalidOperationException();

    public override long Position
    {
        get => _realPosition;
        set => throw new InvalidOperationException();
    }

    #endregion
}
Fábio Batista
  • 25,002
  • 3
  • 56
  • 68