i am using ec2 with 1GB ram and 8GB storage. and this code works very fast if total size of files are under 500-600mb, slower if around 1GB and if the size is 10GB then it's starts using less and less of my 200Mbps internet, and uses around 200-1000kbps? is there any way i can use it without using up my ec2 memory and storage?
private void streamFilesFromS3ToZipToS3(ZippingTask zippingTask) {
try {
// Update task status to PROCESSING
zippingTask.setStatus(ZipTaskStatus.PROCESSING);
zippingTaskService.save(zippingTask);
String folderS3Path = zippingTask.getFolder().getS3Path();
String folderName = zippingTask.getFolder().getFolderName();
String uuid = UUID.randomUUID().toString();
String zipKey = String.format("preSignedZips/%s/%s.zip", uuid, folderName);
List<S3Object> s3Objects = s3Handler.getAllObjectsUnderPrefix(folderS3Path);
int zippedFiles = 0;
// Create and upload zip to S3
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (ZipOutputStream zipOut = new ZipOutputStream(byteArrayOutputStream)) {
for (S3Object s3Object : s3Objects) {
String key = s3Object.key();
// if (!key.endsWith("/") ) { // Skip folders
if (!key.equals(zippingTask.getFolder().getS3Path())) { // Skip folders
try (InputStream objectData = s3Handler.getObjectInputStream(key)) {
ZipEntry zipEntry = new ZipEntry(key.substring(folderS3Path.length()));
zipOut.putNextEntry(zipEntry);
IOUtils.copy(objectData, zipOut);
zipOut.closeEntry();
log.info("Zipped file: {}", key);
zippingTask.setProgress((++zippedFiles) * 100 / s3Objects.size());
zippingTaskService.save(zippingTask);
}
}
}
}
// Upload zip to S3
s3Handler.uploadInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), zipKey);
// String zipPreSignedUrl = s3Handler.generatePreSignedUrlForSingleFile(zipKey);
String zipPreSignedUrl = s3Handler.generatePreSignedUrlForSingleFileWithExpirationDuration(zipKey, Duration.ofDays(1));
// Update task status
zippingTask.setStatus(ZipTaskStatus.COMPLETE);
zippingTask.setZipS3Key(zipKey);
zippingTask.setPresignedUrl(zipPreSignedUrl);
zippingTaskService.save(zippingTask);
// Send email
String userEmail = zippingTask.getUser().getEmail();
emailServices.sendZipCompletionEmail(userEmail, folderName, zipPreSignedUrl);
} catch (Exception e) {
log.error("Error processing zip task: {}", e.getMessage());
zippingTask.setStatus(ZipTaskStatus.ERROR);
zippingTask.setErrorMessage(e.getMessage());
zippingTaskService.save(zippingTask);
}
}
public void uploadInputStream(ByteArrayInputStream byteArrayInputStream, String zipKey) {
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucketName)
.key(zipKey)
.build();
RequestBody requestBody = RequestBody.fromInputStream(byteArrayInputStream, byteArrayInputStream.available());
s3Client.putObject(putObjectRequest, requestBody);
}
i have done it finally, using pipedOutputstream and pipedInputstream
// this function is used to zip the files from S3 and upload zip to S3
public void streamFromS3ToS3ViaZip(ZippingTask zippingTask) {
// Update task status to PROCESSING
zippingTask.setStatus(ZipTaskStatus.PROCESSING);
zippingTaskService.save(zippingTask);
String folderS3Path = zippingTask.getFolder().getS3Path();
String folderName = zippingTask.getFolder().getFolderName();
String uuid = UUID.randomUUID().toString().split("-")[0];
String zipKey = String.format("preSignedZips/%s/%s.zip", uuid, folderName);
final int zipExpirationDays = 3;
final CountDownLatch zipCompletionLatch = new CountDownLatch(1);
final AtomicBoolean zipFailed = new AtomicBoolean(false);
final AtomicReference<Exception> zipException = new AtomicReference<>();
final AtomicLong totalOriginalBytes = new AtomicLong(0);
final AtomicLong totalCompressedBytes = new AtomicLong(0);
try {
// Create an S3 multipart upload session
CreateMultipartUploadResponse createResponse = s3Handler.getS3Client().createMultipartUpload(
CreateMultipartUploadRequest.builder()
.bucket(s3Handler.getBucketName())
.key(zipKey)
.contentType("application/zip")
.build()
);
String uploadId = createResponse.uploadId();
// Use a pipe with a reasonably sized buffer
final PipedOutputStream pipedOut = new PipedOutputStream();
final PipedInputStream pipedIn = new PipedInputStream(pipedOut, 8 * 1024 * 1024); // 8MB buffer
Thread zipThread = new Thread(() -> {
try (
BufferedOutputStream bufferedOut = new BufferedOutputStream(pipedOut, 1 * 1024 * 1024);
ZipOutputStream zipOut = new ZipOutputStream(bufferedOut)
) {
// Fast compression level
zipOut.setLevel(Deflater.NO_COMPRESSION);
List<S3Object> files = s3Handler.getAllObjectsUnderPrefix(folderS3Path);
int totalFiles = files.size();
long totalFileSize = files.stream().mapToLong(S3Object::size).sum();
long zippedFileSize = 0;
int processedFiles = 0;
for (S3Object s3Object : files) {
// log.info("Processing file: {}, size : {} MB", s3Object.key(), String.format("%.2f", s3Object.size() / (1024.0 * 1024.0)));
// Skip folders
String key = s3Object.key();
if (key.equals(folderS3Path)) {
processedFiles++;
continue;
}
String zipEntryName = key.substring(folderS3Path.length());
if (zipEntryName.startsWith("/")) {
zipEntryName = zipEntryName.substring(1);
}
try (InputStream s3In = s3Handler.getObjectInputStream(key)) {
zipOut.putNextEntry(new ZipEntry(zipEntryName));
byte[] buffer = new byte[1 * 1024 * 1024]; // 1MB buffer for reading
int len;
long fileSize = 0;
while ((len = s3In.read(buffer)) > 0) {
zipOut.write(buffer, 0, len);
fileSize += len;
}
totalOriginalBytes.addAndGet(fileSize);
zipOut.closeEntry();
zippedFileSize += s3Object.size();
processedFiles++;
if (processedFiles % 10 == 0 || processedFiles == totalFiles) {
// log.info("ZIP progress: {}/{} files ({} %), Total original data: {} MB",
// processedFiles, totalFiles,
// totalFiles > 0 ? (processedFiles * 100 / totalFiles) : 0,
// String.format("%.2f", totalOriginalBytes.get() / (1024.0 * 1024.0)));
// zippingTask.setProgress(processedFiles * 100 / totalFiles);
int progress = (int) (zippedFileSize * 100 / totalFileSize);
zippingTask.setProgress(progress);
zippingTaskService.save(zippingTask);
// log.debug("Updated progress for task {}: {}%", zippingTask.getId(), progress);
}
}
}
zipOut.finish();
log.info("📦 ZIP creation finished - Original data size: {} MB",
String.format("%.2f", totalOriginalBytes.get() / (1024.0 * 1024.0)));
} catch (Exception e) {
zipFailed.set(true);
zipException.set(e);
log.error("❌ Error zipping: {}", e.getMessage(), e);
} finally {
try {
pipedOut.close();
} catch (IOException ignore) {
}
zipCompletionLatch.countDown();
}
}, "zip-thread-" + zipKey);
Thread uploadThread = new Thread(() -> {
List<CompletedPart> completedParts = new ArrayList<>();
int partNumber = 1;
long uploadedBytes = 0;
long startTime = System.currentTimeMillis();
try {
// Use chunked upload - AWS requires minimum 5MB parts except for last part
final int PART_SIZE = 10 * 1024 * 1024; // 10MB chunks
byte[] buffer = new byte[PART_SIZE];
int bytesRead;
while ((bytesRead = readFully(pipedIn, buffer, 0, buffer.length)) > 0) {
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
// Upload part
UploadPartResponse uploadPartResponse = s3Handler.getS3Client().uploadPart(
UploadPartRequest.builder()
.bucket(s3Handler.getBucketName())
.key(zipKey)
.uploadId(uploadId)
.partNumber(partNumber)
.contentLength((long) bytesRead)
.build(),
RequestBody.fromByteBuffer(byteBuffer)
);
completedParts.add(
CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.eTag())
.build()
);
uploadedBytes += bytesRead;
totalCompressedBytes.set(uploadedBytes);
log.debug("Uploaded part {} - {} MB (total: {} MB)",
partNumber,
String.format("%.2f", bytesRead / (1024.0 * 1024.0)),
String.format("%.2f", uploadedBytes / (1024.0 * 1024.0)));
partNumber++;
}
// Complete the multipart upload
s3Handler.getS3Client().completeMultipartUpload(
CompleteMultipartUploadRequest.builder()
.bucket(s3Handler.getBucketName())
.key(zipKey)
.uploadId(uploadId)
.multipartUpload(
CompletedMultipartUpload.builder()
.parts(completedParts)
.build()
)
.build()
);
long endTime = System.currentTimeMillis();
double uploadSizeMB = uploadedBytes / (1024.0 * 1024.0);
double durationSeconds = (endTime - startTime) / 1000.0;
double mbPerSecond = durationSeconds > 0 ? uploadSizeMB / durationSeconds : 0;
double compressionRatio = totalOriginalBytes.get() > 0 ?
(double) uploadedBytes / totalOriginalBytes.get() * 100 : 0;
log.info("📤 Upload complete: {} - Original: {} MB, Compressed: {} MB ({}%), Duration: {} seconds, Speed: {} MB/s",
zipKey,
String.format("%.2f", totalOriginalBytes.get() / (1024.0 * 1024.0)),
String.format("%.2f", uploadSizeMB),
String.format("%.1f", compressionRatio),
String.format("%.2f", durationSeconds),
String.format("%.2f", mbPerSecond));
} catch (Exception e) {
log.error("❌ Error uploading: {}", e.getMessage(), e);
// Abort the multipart upload on failure
try {
s3Handler.getS3Client().abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(s3Handler.getBucketName())
.key(zipKey)
.uploadId(uploadId)
.build()
);
} catch (Exception abortException) {
log.error("Failed to abort multipart upload: {}", abortException.getMessage());
}
} finally {
try {
pipedIn.close();
} catch (IOException ignore) {
}
}
}, "upload-thread-" + zipKey);
zipThread.setPriority(Thread.MAX_PRIORITY);
uploadThread.setPriority(Thread.MAX_PRIORITY);
uploadThread.start();
zipThread.start();
zipThread.join();
uploadThread.join();
String zipPreSignedUrl = s3Handler.generatePreSignedUrlForSingleFileWithExpirationDuration(zipKey, Duration.ofDays(zipExpirationDays));
// Update task status
zippingTask.setStatus(ZipTaskStatus.COMPLETE);
zippingTask.setZipS3Key(zipKey);
zippingTask.setPresignedUrl(zipPreSignedUrl);
zippingTask.setExpiresAt(LocalDateTime.now(ZoneOffset.UTC).plusDays(zipExpirationDays));
zippingTaskService.save(zippingTask);
// Send email
String userEmail = zippingTask.getUser().getEmail();
emailServices.sendZipCompletionEmail(userEmail, folderName, zipPreSignedUrl);
if (zipFailed.get() && zipException.get() != null) {
throw zipException.get();
}
} catch (Exception e) {
log.error("💥 Failed zip+upload: {}", e.getMessage(), e);
throw new RuntimeException("Failed to zip and upload to S3", e);
}
}
// Helper method to read a full chunk from an input stream
private int readFully(InputStream input, byte[] buffer, int offset, int length) throws IOException {
int bytesRead = 0;
int result;
while (bytesRead < length) {
result = input.read(buffer, offset + bytesRead, length - bytesRead);
if (result == -1) {
break;
}
bytesRead += result;
}
return bytesRead;
}