I currently have several thousands of objects on S3. Using a Spring service, the goal is to take a arbitrary list of object keys as an input and compile them into a zip archive in a separate folder in the same bucket for later download.
Building the zip locally is not an option, because the end size of the zip can be tens of GB. I have read through the AWS Java SDK docs and I don't see a way of building a zip dynamically. All of the put/upload functions want a set content length (which I don't have).
I essentially need a zip "placeholder" on S3 that I can stream to without having to keep the desired S3 objects in memory or on disk. How can I do this?
What I ended up doing is adding a class that extends OutputStream
and creates a multi-part upload when the buffer reaches the set limit (in my case I set it to 10MB, AWS minimum is 5MB).
https://gist.github.com/mjohnston-vtx/411b417a5c4b4e053961acf719885085
import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Provides an OutputStream which uses the S3Client to support streaming large files directly to S3 without
* needing to keep the object in local memory or storage.
* <p>
* <a href="https://gist.github.com/blagerweij/ad1dbb7ee2fff8bcffd372815ad310eb#file-s3outputstream-java">
* Adapted for AWS SDK V2 from this Github Gist
* </a>
*/
public class S3OutputStreamWrapper extends OutputStream {
//10MB buffer is above the AWS minimum of 5MB for a part and should prevent having to make too many calls
//https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
protected static final int BUFFER_SIZE = 10000000;
private final S3Client s3Client;
private final String bucket;
private final String key;
private final byte[] buf;
private final List<String> partETags;
private int bufferPos;
private String uploadId;
private boolean streamIsOpen;
public S3OutputStreamWrapper(S3Client s3Client, String bucket, String key) {
this.s3Client = s3Client;
this.bucket = bucket;
this.key = key;
this.buf = new byte[BUFFER_SIZE];
this.bufferPos = 0;
this.partETags = new ArrayList<>();
this.streamIsOpen = true;
}
@Override
public void write(int b) {
assertOpen();
if (bufferPos >= buf.length) {
flushBufferAndRewind();
}
buf[bufferPos++] = (byte) b;
}
@Override
public void write(@NotNull byte[] bytes) {
write(bytes, 0, bytes.length);
}
@Override
public void write(@NotNull byte[] bytes, int o, int l) {
assertOpen();
var offset = o;
var length = l;
int size;
while (length > (size = buf.length - bufferPos)) {
System.arraycopy(bytes, offset, buf, bufferPos, size);
bufferPos += size;
offset += size;
length -= size;
}
System.arraycopy(bytes, offset, buf, bufferPos, length);
bufferPos += length;
}
@Override
public synchronized void flush() {
assertOpen();
}
@Override
public void close() {
if (streamIsOpen) {
streamIsOpen = false;
if (null != uploadId) {
if (bufferPos > 0) {
uploadPart();
}
var completedParts = new CompletedPart[partETags.size()];
for (int i = 0; i < partETags.size(); i++) {
completedParts[i] = CompletedPart.builder().eTag(partETags.get(i)).partNumber(i + 1).build();
}
s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
.build()
);
} else {
s3Client.putObject(
PutObjectRequest.builder()
.bucket(bucket)
.key(key)
.contentLength(Integer.toUnsignedLong(bufferPos))
.build(),
RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, bufferPos), bufferPos)
);
}
}
}
public void cancel() {
streamIsOpen = false;
if (null != uploadId) {
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
.bucket(bucket).key(key).uploadId(uploadId).build()
);
}
}
protected void flushBufferAndRewind() {
if (null == uploadId) {
final var response = s3Client.createMultipartUpload(
CreateMultipartUploadRequest.builder().bucket(bucket).key(key).build()
);
if (response.sdkHttpResponse().isSuccessful()) {
this.uploadId = response.uploadId();
} else {
throw new RuntimeException(
"[HTTP " + response.sdkHttpResponse().statusCode() + "] Failed to create multipart upload."
);
}
}
uploadPart();
bufferPos = 0;
}
protected void uploadPart() {
var response = s3Client.uploadPart(
UploadPartRequest.builder()
.uploadId(uploadId)
.bucket(bucket)
.key(key)
.partNumber(partETags.size() + 1)
.build(),
RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, bufferPos), bufferPos)
);
partETags.add(response.eTag());
}
private void assertOpen() {
if (!streamIsOpen) {
throw new IllegalStateException("Stream is closed.");
}
}
}
This allows you to do GET requests for different objects and pipe them to this OutputStream
:
var keyList = List.of("s3Key1", ...)
var zos = new ZipOutputStream(new S3OutputStreamWrapper(s3Client, bucketName, archiveKey));
for (var key : keyList) {
zos.putNextEntry(new ZipEntry(key));
var imageBytes = s3Client.getObject(...).readAllBytes();
zos.write(bytes, 0, bytes.length)
zos.closeEntry();
}
zos.close();