javaamazon-web-servicesamazon-s3zip

how to stream S3 objects to zip stream and zip stream to s3 in java springboot?


i am using ec2 with 1GB ram and 8GB storage. and this code works very fast if total size of files are under 500-600mb, slower if around 1GB and if the size is 10GB then it's starts using less and less of my 200Mbps internet, and uses around 200-1000kbps? is there any way i can use it without using up my ec2 memory and storage?

 private void streamFilesFromS3ToZipToS3(ZippingTask zippingTask) {
        try {
            // Update task status to PROCESSING
            zippingTask.setStatus(ZipTaskStatus.PROCESSING);
            zippingTaskService.save(zippingTask);

            String folderS3Path = zippingTask.getFolder().getS3Path();
            String folderName = zippingTask.getFolder().getFolderName();
            String uuid = UUID.randomUUID().toString();
            String zipKey = String.format("preSignedZips/%s/%s.zip", uuid, folderName);

            List<S3Object> s3Objects = s3Handler.getAllObjectsUnderPrefix(folderS3Path);
            int zippedFiles = 0;

            // Create and upload zip to S3
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try (ZipOutputStream zipOut = new ZipOutputStream(byteArrayOutputStream)) {
                for (S3Object s3Object : s3Objects) {
                    String key = s3Object.key();
//                    if (!key.endsWith("/") ) { // Skip folders
                    if (!key.equals(zippingTask.getFolder().getS3Path())) { // Skip folders
                        try (InputStream objectData = s3Handler.getObjectInputStream(key)) {
                            ZipEntry zipEntry = new ZipEntry(key.substring(folderS3Path.length()));
                            zipOut.putNextEntry(zipEntry);
                            IOUtils.copy(objectData, zipOut);
                            zipOut.closeEntry();

                            log.info("Zipped file: {}", key);

                            zippingTask.setProgress((++zippedFiles) * 100 / s3Objects.size());
                            zippingTaskService.save(zippingTask);
                        }
                    }
                }
            }

            // Upload zip to S3
            s3Handler.uploadInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), zipKey);
//            String zipPreSignedUrl = s3Handler.generatePreSignedUrlForSingleFile(zipKey);
            String zipPreSignedUrl = s3Handler.generatePreSignedUrlForSingleFileWithExpirationDuration(zipKey, Duration.ofDays(1));

            // Update task status
            zippingTask.setStatus(ZipTaskStatus.COMPLETE);
            zippingTask.setZipS3Key(zipKey);
            zippingTask.setPresignedUrl(zipPreSignedUrl);
            zippingTaskService.save(zippingTask);

            // Send email
            String userEmail = zippingTask.getUser().getEmail();
            emailServices.sendZipCompletionEmail(userEmail, folderName, zipPreSignedUrl);

        } catch (Exception e) {
            log.error("Error processing zip task: {}", e.getMessage());
            zippingTask.setStatus(ZipTaskStatus.ERROR);
            zippingTask.setErrorMessage(e.getMessage());
            zippingTaskService.save(zippingTask);
        }
    }
    public void uploadInputStream(ByteArrayInputStream byteArrayInputStream, String zipKey) {
        PutObjectRequest putObjectRequest = PutObjectRequest.builder()
                .bucket(bucketName)
                .key(zipKey)
                .build();

        RequestBody requestBody = RequestBody.fromInputStream(byteArrayInputStream, byteArrayInputStream.available());
        s3Client.putObject(putObjectRequest, requestBody);
    }

Solution

  • i have done it finally, using pipedOutputstream and pipedInputstream

    //    this function is used to zip the files from S3 and upload zip to S3
        public void streamFromS3ToS3ViaZip(ZippingTask zippingTask) {
            // Update task status to PROCESSING
            zippingTask.setStatus(ZipTaskStatus.PROCESSING);
            zippingTaskService.save(zippingTask);
    
            String folderS3Path = zippingTask.getFolder().getS3Path();
            String folderName = zippingTask.getFolder().getFolderName();
            String uuid = UUID.randomUUID().toString().split("-")[0];
            String zipKey = String.format("preSignedZips/%s/%s.zip", uuid, folderName);
            final int zipExpirationDays = 3;
    
    
            final CountDownLatch zipCompletionLatch = new CountDownLatch(1);
            final AtomicBoolean zipFailed = new AtomicBoolean(false);
            final AtomicReference<Exception> zipException = new AtomicReference<>();
            final AtomicLong totalOriginalBytes = new AtomicLong(0);
            final AtomicLong totalCompressedBytes = new AtomicLong(0);
    
            try {
                // Create an S3 multipart upload session
                CreateMultipartUploadResponse createResponse = s3Handler.getS3Client().createMultipartUpload(
                        CreateMultipartUploadRequest.builder()
                                .bucket(s3Handler.getBucketName())
                                .key(zipKey)
                                .contentType("application/zip")
                                .build()
                );
                String uploadId = createResponse.uploadId();
    
                // Use a pipe with a reasonably sized buffer
                final PipedOutputStream pipedOut = new PipedOutputStream();
                final PipedInputStream pipedIn = new PipedInputStream(pipedOut, 8 * 1024 * 1024); // 8MB buffer
    
                Thread zipThread = new Thread(() -> {
                    try (
                            BufferedOutputStream bufferedOut = new BufferedOutputStream(pipedOut, 1 * 1024 * 1024);
                            ZipOutputStream zipOut = new ZipOutputStream(bufferedOut)
                    ) {
                        // Fast compression level
                        zipOut.setLevel(Deflater.NO_COMPRESSION);
    
                        List<S3Object> files = s3Handler.getAllObjectsUnderPrefix(folderS3Path);
                        int totalFiles = files.size();
                        long totalFileSize = files.stream().mapToLong(S3Object::size).sum();
                        long zippedFileSize = 0;
                        int processedFiles = 0;
    
                        for (S3Object s3Object : files) {
    //                        log.info("Processing file: {}, size : {} MB", s3Object.key(), String.format("%.2f", s3Object.size() / (1024.0 * 1024.0)));
                            // Skip folders
                            String key = s3Object.key();
                            if (key.equals(folderS3Path)) {
                                processedFiles++;
                                continue;
                            }
    
                            String zipEntryName = key.substring(folderS3Path.length());
                            if (zipEntryName.startsWith("/")) {
                                zipEntryName = zipEntryName.substring(1);
                            }
    
                            try (InputStream s3In = s3Handler.getObjectInputStream(key)) {
                                zipOut.putNextEntry(new ZipEntry(zipEntryName));
                                byte[] buffer = new byte[1 * 1024 * 1024]; // 1MB buffer for reading
                                int len;
                                long fileSize = 0;
                                while ((len = s3In.read(buffer)) > 0) {
                                    zipOut.write(buffer, 0, len);
                                    fileSize += len;
                                }
                                totalOriginalBytes.addAndGet(fileSize);
                                zipOut.closeEntry();
    
                                zippedFileSize += s3Object.size();
    
                                processedFiles++;
                                if (processedFiles % 10 == 0 || processedFiles == totalFiles) {
    //                                log.info("ZIP progress: {}/{} files ({} %), Total original data: {} MB",
    //                                        processedFiles, totalFiles,
    //                                        totalFiles > 0 ? (processedFiles * 100 / totalFiles) : 0,
    //                                        String.format("%.2f", totalOriginalBytes.get() / (1024.0 * 1024.0)));
    
    
    //                            zippingTask.setProgress(processedFiles * 100 / totalFiles);
                                    int progress = (int) (zippedFileSize * 100 / totalFileSize);
                                    zippingTask.setProgress(progress);
                                    zippingTaskService.save(zippingTask);
    //                                log.debug("Updated progress for task {}: {}%", zippingTask.getId(), progress);
                                }
    
                            }
                        }
    
                        zipOut.finish();
                        log.info("📦 ZIP creation finished - Original data size: {} MB",
                                String.format("%.2f", totalOriginalBytes.get() / (1024.0 * 1024.0)));
                    } catch (Exception e) {
                        zipFailed.set(true);
                        zipException.set(e);
                        log.error("❌ Error zipping: {}", e.getMessage(), e);
                    } finally {
                        try {
                            pipedOut.close();
                        } catch (IOException ignore) {
                        }
                        zipCompletionLatch.countDown();
                    }
                }, "zip-thread-" + zipKey);
    
                Thread uploadThread = new Thread(() -> {
                    List<CompletedPart> completedParts = new ArrayList<>();
                    int partNumber = 1;
                    long uploadedBytes = 0;
                    long startTime = System.currentTimeMillis();
    
                    try {
                        // Use chunked upload - AWS requires minimum 5MB parts except for last part
                        final int PART_SIZE = 10 * 1024 * 1024; // 10MB chunks
                        byte[] buffer = new byte[PART_SIZE];
    
                        int bytesRead;
                        while ((bytesRead = readFully(pipedIn, buffer, 0, buffer.length)) > 0) {
                            ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
    
                            // Upload part
                            UploadPartResponse uploadPartResponse = s3Handler.getS3Client().uploadPart(
                                    UploadPartRequest.builder()
                                            .bucket(s3Handler.getBucketName())
                                            .key(zipKey)
                                            .uploadId(uploadId)
                                            .partNumber(partNumber)
                                            .contentLength((long) bytesRead)
                                            .build(),
                                    RequestBody.fromByteBuffer(byteBuffer)
                            );
    
                            completedParts.add(
                                    CompletedPart.builder()
                                            .partNumber(partNumber)
                                            .eTag(uploadPartResponse.eTag())
                                            .build()
                            );
    
                            uploadedBytes += bytesRead;
                            totalCompressedBytes.set(uploadedBytes);
    
                            log.debug("Uploaded part {} - {} MB (total: {} MB)",
                                    partNumber,
                                    String.format("%.2f", bytesRead / (1024.0 * 1024.0)),
                                    String.format("%.2f", uploadedBytes / (1024.0 * 1024.0)));
    
                            partNumber++;
                        }
    
                        // Complete the multipart upload
                        s3Handler.getS3Client().completeMultipartUpload(
                                CompleteMultipartUploadRequest.builder()
                                        .bucket(s3Handler.getBucketName())
                                        .key(zipKey)
                                        .uploadId(uploadId)
                                        .multipartUpload(
                                                CompletedMultipartUpload.builder()
                                                        .parts(completedParts)
                                                        .build()
                                        )
                                        .build()
                        );
    
                        long endTime = System.currentTimeMillis();
                        double uploadSizeMB = uploadedBytes / (1024.0 * 1024.0);
                        double durationSeconds = (endTime - startTime) / 1000.0;
                        double mbPerSecond = durationSeconds > 0 ? uploadSizeMB / durationSeconds : 0;
                        double compressionRatio = totalOriginalBytes.get() > 0 ?
                                (double) uploadedBytes / totalOriginalBytes.get() * 100 : 0;
    
                        log.info("📤 Upload complete: {} - Original: {} MB, Compressed: {} MB ({}%), Duration: {} seconds, Speed: {} MB/s",
                                zipKey,
                                String.format("%.2f", totalOriginalBytes.get() / (1024.0 * 1024.0)),
                                String.format("%.2f", uploadSizeMB),
                                String.format("%.1f", compressionRatio),
                                String.format("%.2f", durationSeconds),
                                String.format("%.2f", mbPerSecond));
                    } catch (Exception e) {
                        log.error("❌ Error uploading: {}", e.getMessage(), e);
    
                        // Abort the multipart upload on failure
                        try {
                            s3Handler.getS3Client().abortMultipartUpload(
                                    AbortMultipartUploadRequest.builder()
                                            .bucket(s3Handler.getBucketName())
                                            .key(zipKey)
                                            .uploadId(uploadId)
                                            .build()
                            );
                        } catch (Exception abortException) {
                            log.error("Failed to abort multipart upload: {}", abortException.getMessage());
                        }
                    } finally {
                        try {
                            pipedIn.close();
                        } catch (IOException ignore) {
                        }
                    }
                }, "upload-thread-" + zipKey);
    
                zipThread.setPriority(Thread.MAX_PRIORITY);
                uploadThread.setPriority(Thread.MAX_PRIORITY);
    
                uploadThread.start();
                zipThread.start();
    
                zipThread.join();
                uploadThread.join();
    
                String zipPreSignedUrl = s3Handler.generatePreSignedUrlForSingleFileWithExpirationDuration(zipKey, Duration.ofDays(zipExpirationDays));
    
                // Update task status
                zippingTask.setStatus(ZipTaskStatus.COMPLETE);
                zippingTask.setZipS3Key(zipKey);
                zippingTask.setPresignedUrl(zipPreSignedUrl);
                zippingTask.setExpiresAt(LocalDateTime.now(ZoneOffset.UTC).plusDays(zipExpirationDays));
                zippingTaskService.save(zippingTask);
    
                // Send email
                String userEmail = zippingTask.getUser().getEmail();
                emailServices.sendZipCompletionEmail(userEmail, folderName, zipPreSignedUrl);
    
    
                if (zipFailed.get() && zipException.get() != null) {
                    throw zipException.get();
                }
            } catch (Exception e) {
                log.error("💥 Failed zip+upload: {}", e.getMessage(), e);
                throw new RuntimeException("Failed to zip and upload to S3", e);
            }
        }
    
        // Helper method to read a full chunk from an input stream
        private int readFully(InputStream input, byte[] buffer, int offset, int length) throws IOException {
            int bytesRead = 0;
            int result;
            while (bytesRead < length) {
                result = input.read(buffer, offset + bytesRead, length - bytesRead);
                if (result == -1) {
                    break;
                }
                bytesRead += result;
            }
            return bytesRead;
        }