I'm trying to read through multiple compressed tables that are 5GB+ in size in R, and because I have insufficient memory to read them into memory all at once I need to process them one chunk at a time, for example the first 1000 rows of each file, then the next 1000 rows of each file, etc. I know how to keep a file open with a cursor or file pointer saved in basically any language other than R. How can I do that here?
I'm currently doing something a lot like this:
library(data.table)
library(R.utils)
inFiles = c("file1.tsv.gz", "file2.tsv.gz", "file3.tsv.gz")
totallines <- 10000
chunksize <- 1000
iters <- 1
skip_val <- 0
max_iters <- ceiling(totallines/chunksize)
while (iters <= max_iters) {
data = lapply(inFiles,function(file) {
data.table::fread(file, nrows=chunksize, skip=skip_val,
col.names=data_colnames, sep="\t")
})
# Process the data in omitted code here
# Move on to the next chunk
iters = iters + 1
skip_val = skip_val + chunksize
}
The problem is that these files are large-ish and compressed, and the smaller the chunksize or larger the file, the program spends more and more of its time just reading because of the skipped lines. Every single time it reads the next chunk, it also has to decompress and skip all of the previous lines.
I looked at readr::read_delim_chunked , but am not sure how I could use it to iterate through many files at once.
You're looking for pipe()
. When used inside a loop like repeat()
, readLines()
continues from the current position — it doesn't restart gunzip
or re-decompress previous content.
process_chunks <- \(x, total.lines=1e5, chunk.size=1e3) {
n_chunks <- ceiling(total.lines/chunk.size)
unix <- identical(.Platform$OS.type, "unix")
## open pipe
if (!unix) {
con <- pipe(sprintf("7z e -so %s", shQuote(x)), open="r") ## Windows fallback (not tested)
} else {
con <- pipe(sprintf("gunzip -c %s", shQuote(x)), open="r")
}
on.exit(try(close(con), silent=TRUE)) ## ensure pipe is closed gracefully on exit
res_list <- vector(mode='list', length=n_chunks)
i <- 1
repeat {
lins <- readLines(con, n=chunk.size)
if (length(lins) == 0) break
df <- data.table::fread(text=lins)
## Process data, save in list
res_list[[i]] <- colSums(df)
## ++++++++++++++++++++++++++
i <- i + 1
}
do.call(rbind, res_list) ## rbind result
}
Note: Solution as is assumes there's just data in the .tsv's, no header.
Single file:
> process_chunks("foo1.tsv.gz") |> head()
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
Multiple files:
> in_Files <- c("foo1.tsv.gz", "foo2.tsv.gz", "foo3.tsv.gz")
> lapply(in_Files, process_chunks, total.lines=1e5, chunk.size=1e3) |> lapply(head)
[[1]]
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
[[2]]
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
[[3]]
V1 V2 V3 V4
[1,] -25.824427 -38.1319442 -15.260574 11.32532
[2,] -5.317994 -66.8804838 -3.754295 40.01791
[3,] -3.206987 -0.4199584 31.328836 11.47539
[4,] -21.786821 36.2002708 -25.986968 -12.03419
[5,] -15.829041 -5.8027936 -25.947610 26.12207
[6,] 23.008565 34.1792188 71.192981 -13.35848
On Linux we might use parallel::mclapply
:
parallel::mclapply(in_Files, process_chunks, mc.cores=parallel::detectCores() - 1)
No need to specify total lines; a flexible function (FX
) is applied per chunk, metadata lines (skip
) can be skipped, and a header
is supported. The shell command (unz
) is customizable for any decompression tool. matrix
calculations are supported by default, and a warning is issued if the last chunk is smaller than expected.
process_chunks2 <- \(x, FX, csz=1e3, skip=0L, header=FALSE, matrix=TRUE,
unz='gunzip -c', warn=TRUE, ...) {
unix <- identical(.Platform$OS.type, "unix")
xq <- shQuote(x, if (!unix) 'cmd' else 'sh')
con <- pipe(sprintf("%s %s", unz, xq), open="r") ## open pipe
on.exit(try(close(con), silent=TRUE)) ## ensure pipe is closed gracefully on exit
res_list <- list()
i <- 1
if (skip > 0L) {
readLines(con, n=skip)
}
if (header) {
hd <- colnames(data.table::fread(text=readLines(con, n=1)))
}
repeat {
lins <- readLines(con, n=csz)
if (length(lins) == 0) break
ch <- data.table::fread(text=lins)
if (matrix) {
ch <- as.matrix(ch)
}
if (warn && (nr <- nrow(ch)) < csz) {
warning(sprintf("Final chunk short: %d < %d", nr, csz))
}
res_list[[i]] <- FX(ch, ...) ## process chunk
i <- i + 1
}
out <- do.call(rbind, res_list) ## rbind result
if (header) {
`colnames<-`(out, hd)
} else{
`colnames<-`(out, NULL)
}
}
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
[,1] [,2] [,3] [,4]
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=5, header=TRUE) |> head(2)
A1 A2 A3 A4
[1,] -0.025824427 -0.03813194 -0.015260574 0.01132532
[2,] -0.005317994 -0.06688048 -0.003754295 0.04001791
Example where total rows are not divisible by chunk size (e.g., m <- 1e5 - 1
in Data, infra):
> process_chunks2(x='bar.tsv.gz', FX=matrixStats::colMeans2, skip=6, header=FALSE) |> head(2)
[,1] [,2] [,3] [,4]
[1,] -0.025824427 -0.03763184 -0.01190839 0.01348543
[2,] -0.005317994 -0.06963092 -0.00367911 0.03837964
Warning message:
In process_chunks2(x = "bar.tsv.gz", FX = matrixStats::colMeans2, :
Final chunk short: 999 < 1000
Data:
(For Linux. Eight files will be created in current directory.)
m <- 1e5; n <- 4
set.seed(42)
mat <- matrix(rnorm(m*n), m, n)
mat |>
write.table('foo.tsv', row.names=FALSE, col.names=FALSE, sep='\t')
system('pigz -p 7 -f foo.tsv')
system('for i in 1 2 3; do cp foo.tsv.gz foo${i}.tsv.gz; done')
mat |>
`colnames<-`(paste0('A', seq_len(n))) |>
data.table::fwrite('bar.tmp', row.names=FALSE, col.names=TRUE, sep='\t')
writeLines(c(
"# File: bar.tsv.gz",
"# Created: 2025-04-06",
"# Rows: 100000 (approx.)",
"# Delimiter: tab",
"# Generator: R/data.table::fwrite()"
), "meta.tmp")
system("cat meta.txt bar.tmp > bar.tsv")
file.remove("meta.tmp", "bar.tmp")
system('pigz -p 7 -f bar.tsv')
system('for i in 1 2 3; do cp bar.tsv.gz bar${i}.tsv.gz; done')