For large uploads you'll need to use the S3 Multipart Upload feature. With a little bit of work you can write a wrapper stream that buffers each minimum upload part (5MB) to a MemoryStream and flushes them as parts.
Here's the code I wrote for this specific case:
using System.IO.Compression;
using Microsoft.Extensions.Logging;
using Amazon.S3;
using Amazon.S3.Model;
using Atelie.Dev.Utils;
var s3Client = new AmazonS3Client();
var req = new InitiateMultipartUploadRequest { BucketName = "...", Key = "...", ContentType = "application/zip" };
var res = await s3Client.InitiateMultipartUploadAsync(req);
var loggerFactory = new LoggerFactory();
var logger = loggerFactory.CreateLogger<S3MultipartUploadStream>();
await using (var upload = new S3MultipartUploadStream(s3Client, res, logger))
{
// the leaveOpen is mandatory, you don't want to let ZipArchive close our stream
using (var archive = new ZipArchive(upload, ZipArchiveMode.Create, leaveOpen: true))
{
var ze = archive.CreateEntry("...");
await using var es = ze.Open();
// write what you need to es
}
await upload.CompleteUploadAsync();
}
And here's the code for the S3MultipartUploadStream
class (beware, here be dragons):
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Amazon.S3;
using Amazon.S3.Model;
using Microsoft.Extensions.Logging;
namespace Atelie.Dev.Utils;
public class S3MultipartUploadStream : Stream
{
private const int MinPartSize = 5 * 1024 * 1024; // 5MB
private const int BufferCapacity = MinPartSize;
private readonly IAmazonS3 _s3Client;
private readonly InitiateMultipartUploadResponse _mur;
private readonly ILogger<S3MultipartUploadStream> _logger;
private readonly ConcurrentBag<PartETag> _parts = new();
private readonly ConcurrentBag<Task> _uploadTasks = new();
private readonly CancellationTokenSource _cts = new();
private readonly int _concurrency;
private volatile MemoryStream _mem = new(BufferCapacity);
private Task? _completedTask;
private long _realPosition;
private int _partNumber;
public S3MultipartUploadStream(IAmazonS3 s3Client, InitiateMultipartUploadResponse mur, ILogger<S3MultipartUploadStream> logger, int concurrency = 4)
{
_s3Client = s3Client;
_mur = mur;
_logger = logger;
_concurrency = concurrency;
}
/// <summary>
/// Finishes uploading the pending parts, and marks the multipart upload as completed.
/// If you don't call this method, the upload will be aborted when calling <see cref="Dispose"/>.
/// </summary>
public Task CompleteUploadAsync()
{
if (_completedTask != null)
throw new InvalidOperationException("Upload already finished or aborted");
return _completedTask = Task.Run(async () =>
{
if (_mem.Position > 0)
await UploadPart(true);
// waits for all upload tasks to finish
Task.WaitAll(_uploadTasks.ToArray());
var req = new CompleteMultipartUploadRequest
{
BucketName = _mur.BucketName,
Key = _mur.Key,
UploadId = _mur.UploadId,
PartETags = _parts.ToList(),
};
_logger.LogInformation("Finished upload of {Count} parts", req.PartETags.Count);
await _s3Client.CompleteMultipartUploadAsync(req);
});
}
/// <summary>
/// Aborts the upload and its parts. Called automatically if <see cref="Dispose"/> is called before
/// <see cref="CompleteUploadAsync"/>.
/// </summary>
private Task AbortUploadAsync()
{
return _completedTask ??= Task.Run(async () =>
{
// cancels any running upload
_cts.Cancel();
var req = new AbortMultipartUploadRequest
{
BucketName = _mur.BucketName,
Key = _mur.Key,
UploadId = _mur.UploadId,
};
_logger.LogInformation("Aborting upload of: {Key}", req.Key);
await _s3Client.AbortMultipartUploadAsync(req);
});
}
/// <summary>
/// Uploads the current MemoryStream asynchronously, and replaces the MemoryStream for a pristine one.
/// </summary>
private async Task UploadPart(bool isLastPart)
{
// if the current position is zero, we don't need to perform the upload
if (_mem.Position == 0)
return;
if (_mem.Length < MinPartSize && !isLastPart)
throw new InvalidOperationException("You can't upload a part smaller than 5MB, unless it's the last one");
// if we're over the configured concurrency, wait
while (_uploadTasks.Count(t => !t.IsCompleted) >= _concurrency)
await Task.Delay(100, _cts.Token);
// sets the part number and makes a copy of the stream to upload
var partNumber = Interlocked.Increment(ref _partNumber);
var msToWrite = _mem;
// creates the new stream for the next part
_mem = new MemoryStream(BufferCapacity);
// starts the upload task
_uploadTasks.Add(Task.Run(async () =>
{
await using (msToWrite)
{
msToWrite.Position = 0;
var req = new UploadPartRequest
{
BucketName = _mur.BucketName,
Key = _mur.Key,
InputStream = msToWrite,
PartNumber = partNumber,
PartSize = msToWrite.Length,
UploadId = _mur.UploadId,
IsLastPart = isLastPart,
};
_logger.LogInformation("Part {Seq}: staring upload of {Size} bytes", req.PartNumber, req.PartSize);
var res = await _s3Client.UploadPartAsync(req, _cts.Token);
_parts.Add(new PartETag(req.PartNumber, res.ETag));
_logger.LogInformation("Part {Seq}: upload finished", res.PartNumber);
}
}));
}
#region Dispose and DisposeAsync
protected override void Dispose(bool disposing)
{
var endTask = AbortUploadAsync();
endTask.GetAwaiter().GetResult();
endTask.Dispose();
_mem.Dispose();
_cts.Dispose();
base.Dispose(disposing);
}
public override async ValueTask DisposeAsync()
{
GC.SuppressFinalize(this);
var endTask = AbortUploadAsync();
await Task.WhenAll(_mem.DisposeAsync().AsTask(), endTask, base.DisposeAsync().AsTask());
endTask.Dispose();
_cts.Dispose();
await base.DisposeAsync();
}
#endregion
#region Write and WriteAsync
public override void Write(byte[] buffer, int offset, int count)
{
while (count > 0)
{
var limit = _mem.Capacity - (int) _mem.Position;
if (limit == 0)
UploadPart(false).GetAwaiter().GetResult();
else
{
var written = Math.Min(limit, count);
_mem.Write(buffer, offset, written);
offset += written;
count -= written;
Interlocked.Add(ref _realPosition, written);
}
}
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
}
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
var unwrittenBytes = buffer.Length;
var offset = 0;
while (unwrittenBytes > 0)
{
var limit = _mem.Capacity - (int) _mem.Position;
if (limit == 0)
await UploadPart(false);
else
{
var currentSliceSize = Math.Min(limit, unwrittenBytes);
await _mem.WriteAsync(buffer.Slice(offset, currentSliceSize), cancellationToken);
offset += currentSliceSize;
unwrittenBytes -= currentSliceSize;
Interlocked.Add(ref _realPosition, currentSliceSize);
}
}
}
#endregion
#region Remaining of Stream implementation
public override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
public override void SetLength(long value) => throw new InvalidOperationException();
public override void Flush() {}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new InvalidOperationException();
public override long Position
{
get => _realPosition;
set => throw new InvalidOperationException();
}
#endregion
}