javaamazon-s3streamzipzip4j

Generating Big Zip with streams[with zip4j] and uploading it to s3


I'm working on generating a zip file that has to compress around 2000 documents that are around 1GB in total and after that to upload the zip file in s3 bucket.

I'm using net.lingala.zip4j which is a really nice Java library for handling Zip files. Based on the documentation:https://github.com/srikanth-lingala/zip4j I'm using the stream handling part of it. The code looks almost similar to the one from the documentation:

public ByteArrayOutputStream compress(FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(baos)) {
    if (fileCompressingContext.getFiles() != null) {
        for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
            addFileToZip(zos, file.getContent(), file.getName());
        }
    }

    if (fileCompressingContext.getFolders() != null) {
        for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
            int i = 0;
            for (FileCompressingContext.File file : folder.getFiles()) {
                addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
            }
        }
    }
}

return baos;}

private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
    byte[] buff = new byte[4096];
    int readLen;

    ZipParameters zp = new ZipParameters();
    zp.setFileNameInZip(fileName);
    zos.putNextEntry(zp);
    try (InputStream is = new ByteArrayInputStream(file)) {
        while ((readLen = is.read(buff)) != -1) {
            zos.write(buff, 0, readLen);
        }
    }

    zos.closeEntry();
}

The problem is that the zos.closeEntry(); is throwing after 1000 of documents being compressed, the java.lang.OutOfMemoryError: Java heap space:

java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[na:na]
at net.lingala.zip4j.io.outputstream.CountingOutputStream.write(CountingOutputStream.java:29) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipEntryOutputStream.write(ZipEntryOutputStream.java:33) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CipherOutputStream.write(CipherOutputStream.java:50) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CompressedOutputStream.write(CompressedOutputStream.java:26) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.deflate(DeflaterOutputStream.java:55) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.closeEntry(DeflaterOutputStream.java:63) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipOutputStream.closeEntry(ZipOutputStream.java:108) ~[zip4j-2.9.1.jar:na]

Do you think there is a solution to incrementally stream the Zip archive to S3 as it's being generating !? I mean something like parsing periodically the ByteArrayOutputStream and upload to s3, and after that reset the baos..

If not what are the alternatives ? writing on the disk, and read it and uploaded to s3 ? hmm or maybe compressed in batches ?

Just for the curiosity, I've tried to processed the documents in batches. Something like after 100 documents, write in the Zip, and retake the process. The issue here was that every 100 documents will overwrite the existing Zip. So this again wasn't working. I've tried to call this for every 100 documents:

new ZipFile("test.zip").addStream(new ByteArrayInputStream(baos_lisb.toByteArray()), zp);

, but as I said is overwriting the zip content, so it's not appending.

Thanks upfront


Solution

  • Funny enough, this was on the local machine while I got OutOfMemoryError during the zip generation.

    In testing environment, I got OutOfMemoryError during the retrieval of the documents. So Hibernate was complaining too. This was with a step before the generation. Probably this happened since local machine has 16GB and testing env only 1GB.

    So the solution was build based on the following steps:

    1. retrieve the files in batches with Hibernate, and (flush/clean) transactional entityManager, in order to force Hibernate to not keep in memory all the files. The Batch size was: 50 documents.
    2. Adapt the code for the zip4j compression with Aws multipart upload, in order to compress and upload only one batch of files, and reset the buffers afterwards, to avoid OutOfMemory.

    Step2 was designed and adapted based on: https://www.bennadel.com/blog/3971-generate-and-incrementally-stream-a-zip-archive-to-amazon-s3-using-multipart-uploads-in-lucee-cfml-5-3-7-47.htm

    So the code from the initial question became as follows:

        @Override
    public void compressAndPublish(final FileCompressingContext fileCompressingContext) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ZipOutputStream zos = new ZipOutputStream(baos);
        if (fileCompressingContext.getFiles() != null) {
            for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
                addFileToZip(zos, file.getContent(), file.getName());
            }
        }
    
    if (fileCompressingContext.getFolders() != null) {
        // 1. initialize multi part
        String uploadId = fileHandlerService.initialiseMultiPart(FileHandlerContext.builder()
                .id(fileCompressingContext.getTaskId())
                .root(bucket)
                .fileName("file.zip")
                .build());
    
        int partNumber = 0;
        int docNr = 0;
        List<CompletedPart> completedParts = new ArrayList<>();
    
        for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
            while (!folder.getDocPks().isEmpty()) {
                extractDocuments(folder, fileCompressingContext);
    
                for (FileCompressingContext.File file : folder.getFiles()) {
                    if (baos.size() > PART_SIZE) {
                        log.debug("Id:{} - Preparing for update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
                        FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                                .id(fileCompressingContext.getTaskId())
                                .root(bucket)
                                .fileName(file.zip)
                                .fileContent(baos.toByteArray())
                                .build();
                        // 2. upload parts of the zip
                        completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
    
                        partNumber++;
                        baos.reset();
                    }
    
                    addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
                    docNr++;
                }
    
                folder.getFiles().clear();
            }
        }
    
        finalizeZipContent(zos, baos);
    
        // 3. checks is there are any data remained under 5Mb
        if (baos.size() != 0) {
            log.debug("Id:{} - Preparing LAST update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
    
            FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                    .id(fileCompressingContext.getTaskId())
                    .root(bucket)
                    .fileName(file.zip)
                    .fileContent(baos.toByteArray())
                    .build();
            completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
        }
    
        // 4. finish multipart operation
        FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                .id(fileCompressingContext.getTaskId())
                .root(bucket)
                .fileName(file.zip)
                .build();
        fileHandlerService.finishMultipartUpload(fileHandlerContext, uploadId, completedParts);
    
        log.debug("Id:{} - Multipart upload finished with partNr:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
    } else {
        finalizeZipContent(zos, baos);
    
        FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                .id(fileCompressingContext.getTaskId())
                .root(bucket)
                .fileName("file.zip")
                .fileContent(baos.toByteArray())
                .fileExtension("application/zip")
                .build();
        fileHandlerService.store(fileHandlerContext);
    }
    

    }

    So the only thing that changed, was the integration with aws multipart which allows to upload big data in chunks of data. And also the resetting of the buffer after every upload: baos.reset();

    Also another important step is this method:

    private void finalizeZipContent(ZipOutputStream zos, ByteArrayOutputStream baos) throws IOException {
    zos.flush();
    zos.close();
    baos.close();
    }
    

    ,which close the ZipOutputStream and ByteArrayOutputStream. If this step is not done at the end, the zip will look as a corrupt one.

    Also the method addFileToZip(...) can be written much simpler:

    private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
    ZipParameters zp = new ZipParameters();
    zp.setFileNameInZip(fileName);
    zos.putNextEntry(zp);
    zos.write(file);
    zos.closeEntry();
        zos.flush();
    }
    

    , it's not required to define that fix size byte of array

    Really hope this will help someone and will save some time. Cheers