javakotlintarapache-commons-compress

How to process a large tar file entirely in memory?


I am trying to build an endpoint in Kotlin which accepts 10+ GB tar files, and processes the contents one by one.

The Tar file contains millions of JSON files, and I am running this application in a Docker container with very limited disk size, so extracting the entire archive to a temporary directory is not an option.

The following approach with Apache Compress:

post(...) {
    val multipart = call.receiveMultipart()
    multipart.forEachPart { part ->
        if (part is PartData.FileItem) {
            part.streamProvider().use { inputStream ->
                BufferedInputStream(inputStream).use { bufferedInputStream ->
                    TarArchiveInputStream(bufferedInputStream).use { tarInput ->

leads to java.io.IOException: Corrupted TAR archive. error due to providing the tar data in stream instead of a one huge variable that contains all bytes. I also cannot consume the entire input stream into one ByteArray variable and provide it to BufferedInputStream because I don't have 20 gigs of memory.

Any help is appreciated.

The example code doesn't contain any special types belonging to Kotlin or Ktor.

Update

Longer example by sending the file as the POST body:

call.receiveStream().buffered().use { bufferedInputStream ->
    TarArchiveInputStream(bufferedInputStream).use { tarInput ->
        var entry = tarInput.nextEntry
        while (entry != null) {
            if (!entry.isDirectory && entry.name.endsWith(".json")) {
                scope.launch {
                    val jsonString = tarInput.bufferedReader().readText()
                    val json: Map<String, JsonElement> =
                        Json.parseToJsonElement(jsonString).jsonObject

Update after Answer

// any stream that implements java.io.InputStream
val bodyStream = call.receiveStream()
val elems = sequence {
    bodyStream.buffered().use { bufferedInputStream ->
        TarArchiveInputStream(bufferedInputStream).use { tarInput ->
            while (true) {
                val entry = tarInput.nextEntry ?: break
                // do something with entry, yield that something, and process that something later.
                yield()
            }
        }
    }
}

Problem was asynchronously processing the tar, its explained in detail at accepted answer.


Solution

  • I guess the problem is caused by this line:

    scope.launch {
    

    We can't process the input stream concurrently, because whenever we call nextItem we re-use exactly the same input stream, it just seeks to another place. So all concurrent consumers actually consume from the same place.

    We can only read the input stream sequentially. If the JSON parsing takes a lot of time and you still like to use multiple threads, you can first read into byte arrays / strings sequentially, then parse them as JSONs concurrently. But reading from the stream itself has to be sequential.