amazon-web-servicesamazon-s3.net-coreaws-sdk-netc#-ziparchive

Zip multiple S3 files and stream the archive back to S3 using limited memory


I am trying to use .NET 8 to zip a large amount of files stored in S3 and save the archive in another S3 location, while consuming a limited amount of memory.

Basically my needs are:

So the script should be able to start downloading & zipping the files into a memory buffer, stream the archive back to S3 in parallel, "forget" about the buffered data it has already uploaded back to S3, and pause the download if the 100 MB in-memory buffer is full (upload speed might be limited).

I tried to fiddle with memory streams, System.IO.Compression.ZipArchive and Amazon.S3.Transfer.ITransferUtility, but could not come up with a working solution.


Solution

  • I came up with a working solution using System.IO.Pipelines (see documentation).

    It uses an in-memory buffer (pipe) of 100 MB which is filled by a writer thread (download from S3, zip, write to pipe) and consumed by a reader thread (read from pipe, upload to S3). When the buffer is full, the writer thread pauses until the reader thread has consumed enough data and the buffer gets below a "resume" threshold of 50 MB.

    using Amazon.S3;
    using Amazon.S3.Model;
    using Amazon.S3.Transfer;
    using System.IO.Compression;
    using System.IO.Pipelines;
    
    const string SOURCE_BUCKET_NAME = "my-bucket";
    const string SOURCE_PREFIX = "source/";
    const string TARGET_BUCKET_NAME = "my-bucket";
    const string TARGET_OBJECT_KEY = "export.zip";
    
    // pause download when buffer gets above 100 MB
    const long PIPE_PAUSE_WRITER_THRESHOLD = 100_000_000;
    
    // resume download when buffer gets below 50 MB
    const long PIPE_RESUME_WRITER_THRESHOLD = 50_000_000;
    
    IAmazonS3 s3Client = new AmazonS3Client();
    ITransferUtility s3Transfer = new TransferUtility(s3Client);
    
    PipeOptions pipeOptions = new(
        pauseWriterThreshold: PIPE_PAUSE_WRITER_THRESHOLD,
        resumeWriterThreshold: PIPE_RESUME_WRITER_THRESHOLD
    );
    Pipe pipe = new(pipeOptions);
    CancellationToken ct = new();
    Task writing = FillPipeAsync(pipe.Writer, ct);
    Task reading = ReadPipeAsync(pipe.Reader, ct);
    await Task.WhenAll(reading, writing);
    
    async Task FillPipeAsync(PipeWriter writer, CancellationToken ct)
    {
        try
        {
            await DownloadAndZipFiles(writer, ct);
        }
        finally
        {
            await writer.CompleteAsync();
        }
    }
    
    async Task ReadPipeAsync(PipeReader reader, CancellationToken ct)
    {
        try
        {
            await UploadArchive(reader, ct);
        }
        finally
        {
            await reader.CompleteAsync();
        }
    }
    
    async Task DownloadAndZipFiles(PipeWriter writer, CancellationToken ct)
    {
        List<S3Object> s3Objects = await ListS3Objects(ct);
    
        await using Stream s = writer.AsStream();
        using (ZipArchive zipArchive = new(s, ZipArchiveMode.Create, leaveOpen: true))
        {
            foreach (S3Object s3Object in s3Objects)
            {
                // Download file and add ZIP archive entry
                await DownloadAndZipObject(s3Object, zipArchive, ct);
    
                // Flush the compressed data to the pipe
                FlushResult flushResult = await writer.FlushAsync(ct);
                if (flushResult.IsCompleted)
                {
                    // reader has exited without processing all data: abort execution
                    break;
                }
            }
        }
    
        // Flush the final archive bytes which are created when disposing the archive
        FlushResult finalFlushResult = await writer.FlushAsync(ct);
        if (finalFlushResult.IsCompleted)
        {
            // reader has exited without processing all data: do nothing (exit anyway)
        }
    }
    
    async Task<List<S3Object>> ListS3Objects(CancellationToken ct)
    {
        ListObjectsV2Request req = new()
        {
            BucketName = SOURCE_BUCKET_NAME,
            Prefix = SOURCE_PREFIX,
        };
        ListObjectsV2Response res = await s3Client.ListObjectsV2Async(req, ct);
        return res.S3Objects;
    }
    
    async Task DownloadAndZipObject(S3Object obj, ZipArchive a, CancellationToken ct)
    {
        await using Stream s3FileStream = await s3Transfer.OpenStreamAsync(
            obj.BucketName, obj.Key, ct
        );
        string entryName = SanitizePhysicalFilePath(obj.Key);
        ZipArchiveEntry entry = a.CreateEntry(entryName);
        await using Stream entryStream = entry.Open();
        await s3FileStream.CopyToAsync(entryStream, ct);
    }
    
    async Task UploadArchive(PipeReader reader, CancellationToken ct)
    {
        await using Stream s = reader.AsStream();
        TransferUtilityUploadRequest req = new()
        {
            BucketName = TARGET_BUCKET_NAME,
            Key = TARGET_OBJECT_KEY,
            InputStream = s,
            ContentType = "application/zip",
        };
        await s3Transfer.UploadAsync(req, ct);
    }
    
    static string SanitizePhysicalFilePath(string filePath)
    {
        char[] invalids = Path.GetInvalidFileNameChars();
    
        string[] pathSegments = filePath.Split('/');
        return string.Join('/', pathSegments.Select(x =>
        {
            string[] parts = x.Split(invalids, StringSplitOptions.RemoveEmptyEntries);
            return string.Join("_", parts).TrimEnd('.');
        }));
    }