javagroovystreamapache-nifizip4j

NiFi ExecuteScript output is corrupted using zip4j


@Grab('net.lingala.zip4j:zip4j:2.2.8')
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;

import org.apache.commons.io.IOUtils

flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
    byte[] inputByteArray = IOUtils.toByteArray(inputStream)

    ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
    ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())

    //init the zip parameters
    ZipParameters zipParams = new ZipParameters()
    zipParams.setEncryptFiles(true)
    zipParams.setEncryptionMethod(EncryptionMethod.AES)
    zipParams.setFileNameInZip("records.csv")

    outputZipStream.putNextEntry(zipParams)
    outputZipStream.write(inputByteArray)
    outputZipStream.closeEntry()

    outputZipStream.close()
    outputByteStream.close()

    outputStream.write(outputByteStream.toByteArray())
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

I am zipping csv with a password using executescript written in groovy. The executescript processor is able to pass the flowfile to the next processor without a problem but that file is corrupted. The PutSFTP processor is complaining about the following error.enter image description here

The file is stored via SFTP nonetheless but I can't unzip it since its corrupted. What is wrong with my code?


Solution

  • for anyones reference, you need to close the inputStream manually after using to toByteArray: The following code should work:

    @Grab('net.lingala.zip4j:zip4j:2.2.8')
    import net.lingala.zip4j.io.outputstream.ZipOutputStream;
    import net.lingala.zip4j.model.ZipParameters;
    import net.lingala.zip4j.model.enums.EncryptionMethod;
    
    import org.apache.commons.io.IOUtils
    
    flowFile = session.get()
    if(!flowFile) return
    
    flowFile = session.write(flowFile, {inputStream, outputStream ->
        byte[] inputByteArray = IOUtils.toByteArray(inputStream)
        inputStream.close() // this is what was missing in the original code
    
        ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
        ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())
    
        //init the zip parameters
        ZipParameters zipParams = new ZipParameters()
        zipParams.setEncryptFiles(true)
        zipParams.setEncryptionMethod(EncryptionMethod.AES)
        zipParams.setFileNameInZip("records.csv")
    
        outputZipStream.putNextEntry(zipParams)
        outputZipStream.write(inputByteArray)
        outputZipStream.closeEntry()
    
        outputZipStream.close()
        outputByteStream.close()
    
        outputStream.write(outputByteStream.toByteArray())
    } as StreamCallback)
    
    session.transfer(flowFile, REL_SUCCESS)