javaspringamazon-s3

Dynamically build a zip archive on AWS S3 using the Java SDK?


I currently have several thousands of objects on S3. Using a Spring service, the goal is to take a arbitrary list of object keys as an input and compile them into a zip archive in a separate folder in the same bucket for later download.

Building the zip locally is not an option, because the end size of the zip can be tens of GB. I have read through the AWS Java SDK docs and I don't see a way of building a zip dynamically. All of the put/upload functions want a set content length (which I don't have).

I essentially need a zip "placeholder" on S3 that I can stream to without having to keep the desired S3 objects in memory or on disk. How can I do this?


Solution

  • What I ended up doing is adding a class that extends OutputStream and creates a multi-part upload when the buffer reaches the set limit (in my case I set it to 10MB, AWS minimum is 5MB).

    https://gist.github.com/mjohnston-vtx/411b417a5c4b4e053961acf719885085

    import org.jetbrains.annotations.NotNull;
    import software.amazon.awssdk.core.sync.RequestBody;
    import software.amazon.awssdk.services.s3.S3Client;
    import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
    import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
    import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
    import software.amazon.awssdk.services.s3.model.CompletedPart;
    import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
    import software.amazon.awssdk.services.s3.model.PutObjectRequest;
    import software.amazon.awssdk.services.s3.model.UploadPartRequest;
    
    import java.io.ByteArrayInputStream;
    import java.io.OutputStream;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * Provides an OutputStream which uses the S3Client to support streaming large files directly to S3 without
     * needing to keep the object in local memory or storage.
     * <p>
     * <a href="https://gist.github.com/blagerweij/ad1dbb7ee2fff8bcffd372815ad310eb#file-s3outputstream-java">
     * Adapted for AWS SDK V2 from this Github Gist
     * </a>
     */
    public class S3OutputStreamWrapper extends OutputStream {
        //10MB buffer is above the AWS minimum of 5MB for a part and should prevent having to make too many calls
        //https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
        protected static final int BUFFER_SIZE = 10000000;
    
        private final S3Client s3Client;
        private final String bucket;
        private final String key;
        private final byte[] buf;
        private final List<String> partETags;
    
        private int bufferPos;
        private String uploadId;
        private boolean streamIsOpen;
    
        public S3OutputStreamWrapper(S3Client s3Client, String bucket, String key) {
            this.s3Client = s3Client;
            this.bucket = bucket;
            this.key = key;
            this.buf = new byte[BUFFER_SIZE];
            this.bufferPos = 0;
            this.partETags = new ArrayList<>();
            this.streamIsOpen = true;
        }
    
        @Override
        public void write(int b) {
            assertOpen();
            if (bufferPos >= buf.length) {
                flushBufferAndRewind();
            }
            buf[bufferPos++] = (byte) b;
        }
    
        @Override
        public void write(@NotNull byte[] bytes) {
            write(bytes, 0, bytes.length);
        }
    
        @Override
        public void write(@NotNull byte[] bytes, int o, int l) {
            assertOpen();
    
            var offset = o;
            var length = l;
            int size;
            while (length > (size = buf.length - bufferPos)) {
                System.arraycopy(bytes, offset, buf, bufferPos, size);
                bufferPos += size;
                offset += size;
                length -= size;
            }
            System.arraycopy(bytes, offset, buf, bufferPos, length);
            bufferPos += length;
        }
    
        @Override
        public synchronized void flush() {
            assertOpen();
        }
    
        @Override
        public void close() {
            if (streamIsOpen) {
                streamIsOpen = false;
                if (null != uploadId) {
                    if (bufferPos > 0) {
                        uploadPart();
                    }
    
                    var completedParts = new CompletedPart[partETags.size()];
                    for (int i = 0; i < partETags.size(); i++) {
                        completedParts[i] = CompletedPart.builder().eTag(partETags.get(i)).partNumber(i + 1).build();
                    }
                    s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
                        .bucket(bucket)
                        .key(key)
                        .uploadId(uploadId)
                        .multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
                        .build()
                    );
                } else {
                    s3Client.putObject(
                        PutObjectRequest.builder()
                            .bucket(bucket)
                            .key(key)
                            .contentLength(Integer.toUnsignedLong(bufferPos))
                            .build(),
                        RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, bufferPos), bufferPos)
                    );
                }
            }
        }
    
        public void cancel() {
            streamIsOpen = false;
            if (null != uploadId) {
                s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
                    .bucket(bucket).key(key).uploadId(uploadId).build()
                );
            }
        }
    
        protected void flushBufferAndRewind() {
            if (null == uploadId) {
                final var response = s3Client.createMultipartUpload(
                    CreateMultipartUploadRequest.builder().bucket(bucket).key(key).build()
                );
    
                if (response.sdkHttpResponse().isSuccessful()) {
                    this.uploadId = response.uploadId();
                } else {
                    throw new RuntimeException(
                        "[HTTP " + response.sdkHttpResponse().statusCode() + "] Failed to create multipart upload."
                    );
                }
            }
    
            uploadPart();
            bufferPos = 0;
        }
    
        protected void uploadPart() {
            var response = s3Client.uploadPart(
                UploadPartRequest.builder()
                    .uploadId(uploadId)
                    .bucket(bucket)
                    .key(key)
                    .partNumber(partETags.size() + 1)
                    .build(),
                RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, bufferPos), bufferPos)
            );
            partETags.add(response.eTag());
        }
    
        private void assertOpen() {
            if (!streamIsOpen) {
                throw new IllegalStateException("Stream is closed.");
            }
        }
    }
    

    This allows you to do GET requests for different objects and pipe them to this OutputStream:

    var keyList = List.of("s3Key1", ...)
    var zos = new ZipOutputStream(new S3OutputStreamWrapper(s3Client, bucketName, archiveKey));
    for (var key : keyList) {
        zos.putNextEntry(new ZipEntry(key));
        var imageBytes = s3Client.getObject(...).readAllBytes();
        zos.write(bytes, 0, bytes.length)
        zos.closeEntry();
    }
    zos.close();