rcsvdata.tablegzipchunking

Reading through multiple files in chunks in R


I'm trying to read through multiple compressed tables that are 5GB+ in size in R, and because I have insufficient memory to read them into memory all at once I need to process them one chunk at a time, for example the first 1000 rows of each file, then the next 1000 rows of each file, etc. I know how to keep a file open with a cursor or file pointer saved in basically any language other than R. How can I do that here?

I'm currently doing something a lot like this:

library(data.table)
library(R.utils)

inFiles = c("file1.tsv.gz", "file2.tsv.gz", "file3.tsv.gz")
totallines <- 10000
chunksize <- 1000

iters          <- 1
skip_val       <- 0
max_iters      <- ceiling(totallines/chunksize)

while (iters <= max_iters) {

    
    data = lapply(inFiles,function(file) {
      data.table::fread(file, nrows=chunksize, skip=skip_val,
                        col.names=data_colnames, sep="\t")
    })

    # Process the data in omitted code here

    # Move on to the next chunk
    iters    = iters + 1
    skip_val = skip_val + chunksize
}

The problem is that these files are large-ish and compressed, and the smaller the chunksize or larger the file, the program spends more and more of its time just reading because of the skipped lines. Every single time it reads the next chunk, it also has to decompress and skip all of the previous lines.

I looked at readr::read_delim_chunked , but am not sure how I could use it to iterate through many files at once.


Solution

  • You're looking for pipe(). When used inside a loop like repeat(), readLines() continues from the current position — it doesn't restart gunzip or re-decompress previous content.

    process_chunks <- \(x, total.lines=1e5, chunk.size=1e3) {
      n_chunks <- ceiling(total.lines/chunk.size)
      unix <- identical(.Platform$OS.type, "unix")
      ## open pipe
      if (!unix) {
        con <- pipe(sprintf("7z e -so %s", shQuote(x)), open="r")  ## Windows fallback (not tested)
      } else {
        con <- pipe(sprintf("gunzip -c %s", shQuote(x)), open="r")
      }
      on.exit(try(close(con), silent=TRUE))  ## ensure pipe is closed gracefully on exit
      res_list <- vector(mode='list', length=n_chunks)
      i <- 1
      repeat {
        lins <- readLines(con, n=chunk.size)
        if (length(lins) == 0) break
        df <- data.table::fread(text=lins)
        ## Process data, save in list
        res_list[[i]] <- colSums(df)  
        ## ++++++++++++++++++++++++++
        i <- i + 1
      }
      do.call(rbind, res_list)  ## rbind result
    }
    

    Note: Solution as is assumes there's just data in the .tsv's, no header.

    Usage

    Single file:

    > process_chunks("foo1.tsv.gz") |> head()
                 V1          V2         V3        V4
    [1,] -25.824427 -38.1319442 -15.260574  11.32532
    [2,]  -5.317994 -66.8804838  -3.754295  40.01791
    [3,]  -3.206987  -0.4199584  31.328836  11.47539
    [4,] -21.786821  36.2002708 -25.986968 -12.03419
    [5,] -15.829041  -5.8027936 -25.947610  26.12207
    [6,]  23.008565  34.1792188  71.192981 -13.35848
    

    Multiple files:

    > in_Files <- c("foo1.tsv.gz", "foo2.tsv.gz", "foo3.tsv.gz")
    > lapply(in_Files, process_chunks, total.lines=1e5, chunk.size=1e3) |> lapply(head)
    [[1]]
                 V1          V2         V3        V4
    [1,] -25.824427 -38.1319442 -15.260574  11.32532
    [2,]  -5.317994 -66.8804838  -3.754295  40.01791
    [3,]  -3.206987  -0.4199584  31.328836  11.47539
    [4,] -21.786821  36.2002708 -25.986968 -12.03419
    [5,] -15.829041  -5.8027936 -25.947610  26.12207
    [6,]  23.008565  34.1792188  71.192981 -13.35848
    
    [[2]]
                 V1          V2         V3        V4
    [1,] -25.824427 -38.1319442 -15.260574  11.32532
    [2,]  -5.317994 -66.8804838  -3.754295  40.01791
    [3,]  -3.206987  -0.4199584  31.328836  11.47539
    [4,] -21.786821  36.2002708 -25.986968 -12.03419
    [5,] -15.829041  -5.8027936 -25.947610  26.12207
    [6,]  23.008565  34.1792188  71.192981 -13.35848
    
    [[3]]
                 V1          V2         V3        V4
    [1,] -25.824427 -38.1319442 -15.260574  11.32532
    [2,]  -5.317994 -66.8804838  -3.754295  40.01791
    [3,]  -3.206987  -0.4199584  31.328836  11.47539
    [4,] -21.786821  36.2002708 -25.986968 -12.03419
    [5,] -15.829041  -5.8027936 -25.947610  26.12207
    [6,]  23.008565  34.1792188  71.192981 -13.35848
    

    On Linux we might use parallel::mclapply:

    parallel::mclapply(in_Files, process_chunks, mc.cores=parallel::detectCores() - 1)
    

    Enhanced Alternative

    No need to specify total lines; a flexible function (FX) is applied per chunk, metadata lines (skip) can be skipped, and a header is supported. The shell command (unz) is customizable for any decompression tool. matrix calculations are supported by default, and a warning is issued if the last chunk is smaller than expected.

    process_chunks2 <- \(x, FX, csz=1e3, skip=0L, header=FALSE, matrix=TRUE, 
                         unz='gunzip -c', warn=TRUE, ...) {
      unix <- identical(.Platform$OS.type, "unix")
      xq <- shQuote(x, if (!unix) 'cmd' else 'sh')
      con <- pipe(sprintf("%s %s", unz, xq), open="r")  ## open pipe
      on.exit(try(close(con), silent=TRUE))  ## ensure pipe is closed gracefully on exit
      res_list <- list()
      i <- 1
      if (skip > 0L) {
        readLines(con, n=skip)
      }
      if (header) {
       hd <- colnames(data.table::fread(text=readLines(con, n=1)))
      }
      repeat {
        lins <- readLines(con, n=csz)
        if (length(lins) == 0) break
        ch <- data.table::fread(text=lins)
        if (matrix) {
          ch <- as.matrix(ch)
        }
        if (warn && (nr <- nrow(ch)) < csz) {
          warning(sprintf("Final chunk short: %d < %d", nr, csz))
        }
        res_list[[i]] <- FX(ch, ...)  ## process chunk
        i <- i + 1
      }
      out <- do.call(rbind, res_list)  ## rbind result
      if (header) {
        `colnames<-`(out, hd)
      } else{
        `colnames<-`(out, NULL)
      }
    }
    

    > process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
                 [,1]        [,2]         [,3]       [,4]
    [1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
    [2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
    > process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=5, header=TRUE) |> head(2)
                   A1          A2           A3         A4
    [1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
    [2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
    

    Example where total rows are not divisible by chunk size (e.g., m <- 1e5 - 1 in Data, infra):

    > process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
                 [,1]        [,2]        [,3]       [,4]
    [1,] -0.025824427 -0.03763184 -0.01190839 0.01348543
    [2,] -0.005317994 -0.06963092 -0.00367911 0.03837964
    Warning message:
    In process_chunks2(x = "bar.tsv.gz", FX = matrixStats::colMeans2,  :
      Final chunk short: 999 < 1000
    

    Data:

    (For Linux. Eight files will be created in current directory.)

    m <- 1e5; n <- 4
    set.seed(42)
    mat <- matrix(rnorm(m*n), m, n)
    mat |> 
      write.table('foo.tsv', row.names=FALSE, col.names=FALSE, sep='\t')
    system('pigz -p 7 -f foo.tsv')
    system('for i in 1 2 3; do cp foo.tsv.gz foo${i}.tsv.gz; done')
    
    mat |> 
      `colnames<-`(paste0('A', seq_len(n))) |> 
      data.table::fwrite('bar.tmp', row.names=FALSE, col.names=TRUE, sep='\t')
    writeLines(c(
      "# File:       bar.tsv.gz",
      "# Created:    2025-04-06",
      "# Rows:       100000 (approx.)",
      "# Delimiter:  tab",
      "# Generator:  R/data.table::fwrite()"
    ), "meta.tmp")
    system("cat meta.txt bar.tmp > bar.tsv")
    file.remove("meta.tmp", "bar.tmp")
    system('pigz -p 7 -f bar.tsv')
    system('for i in 1 2 3; do cp bar.tsv.gz bar${i}.tsv.gz; done')