I have a list of ffdf
, it takes up about 76GB of RAM if it is loaded to RAM instead of using ff
package. The following is their respective dim()
> ffdfs |> sapply(dim)
[,1] [,2] [,3] [,4] [,5] [,6] [,7]
[1,] 11478746 12854627 10398332 404567958 490530023 540375993 913792256
[2,] 3 3 3 3 3 3 3
[,8] [,9] [,10] [,11] [,12] [,13] [,14]
[1,] 15296863 11588739 547337574 306972654 11544523 255644408 556900805
[2,] 3 3 3 3 3 3 3
[,15] [,16] [,17]
[1,] 13409223 900436690 15184264
[2,] 3 3 3
I want to check the number of duplication in each ffdf
, so I did the following:
check_duplication <- sample_cols |> sapply(function(df) {
df[c("chr","pos")] |> duplicated() |> sum()
})
It works but it is extremely slow.
I am on a HPC, I have about 110GB RAM and 18CPU.
Will there be any other option or setting I could adjust to speed up the process? Thank you.
Parallelization is a natural way to speed this up. It can be done at C level via data.table
:
library("data.table")
data.table 1.14.2 using 4 threads (see ?getDTthreads). Latest news: r-datatable.com
set.seed(1L)
x <- as.data.frame(replicate(2L, sample.int(100L, size = 1e+06L, replace = TRUE), simplify = FALSE))
y <- as.data.table(x)
microbenchmark::microbenchmark(duplicated(x), duplicated(y), times = 1000L)
Unit: milliseconds
expr min lq mean median uq max neval
duplicated(x) 449.27693 596.242890 622.160423 625.610267 644.682319 734.39741 1000
duplicated(y) 5.75722 6.347518 7.413925 6.874593 7.407695 58.12131 1000
The benchmark here shows that duplicated
is much faster when applied to a data.table
instead of an equivalent data frame. Of course, how much faster depends on the number of CPUs that you make available to data.table
(see ?setDTthreads
).
If you go the data.table
route, then you would process your 17 data frames like so:
nduped <- function(ffd) {
x <- as.data.frame(ffd[c("chr", "pos")])
setDT(x)
n <- sum(duplicated(x))
rm(x)
gc(FALSE)
n
}
vapply(list_of_ffd, nduped, 0L)
Here, we are using setDT
rather than as.data.table
to perform an in-place coercion from data frame to data.table
, and we are using rm
and gc
to free the memory occupied by x
before reading another data frame into memory.
If, for whatever reason, data.table
is not an option, then you can stick to using the duplicated
method for data frames, namely duplicated.data.frame
. It is not parallelized at C level, so you would need to parallelize at R level, using, e.g., mclapply
to assign your 17 data frames to batches and process those batches in parallel:
nduped <- function(ffd) {
x <- as.data.frame(ffd[c("chr", "pos")])
n <- sum(duplicated(x))
rm(x)
gc(FALSE)
n
}
unlist(parallel::mclapply(list_of_ffd, nduped, ...))
This option is slower and consumes more memory than you might expect. Fortunately, there is room for optimization. The rest of this answer highlights some of the main issues and ways to get around them. Feel free to stop reading if you've already settled on data.table
.
Since you have 18 CPUs, you can try to process all 17 data frames simultaneously, but you might encounter out-of-memory issues as a result of reading all 17 data frames into memory at once. Increasing the batch size (i.e., distributing the 17 jobs across fewer than 17 CPUs) should help.
Since your 17 data frames vary widely in length (number of rows), randomly assigning them to roughly equally sized batches is probably not a good strategy. You could decrease the overall run time by batching shorter data frames together and not batching longer data frames together. mclapply
has an affinity.list
argument giving you this control. Ideally, each batch should require the same amount of processing time.
The amount of memory that each job uses is actually at least two times greater than the amount needed to store the data frame x
, because duplicated.data.frame
copies its argument:
x <- data.frame(chr = rep(1:2, times = 5L), pos = rep(1:2, each = 5L))
tracemem(x)
[1] "<0x14babad48>"
invisible(duplicated(x))
tracemem[0x14babad48 -> 0x14babc088]: as.list.data.frame as.list vapply duplicated.data.frame duplicated
The copy happens inside of the vapply
call in the body of the method:
duplicated.data.frame
function (x, incomparables = FALSE, fromLast = FALSE, ...)
{
if (!isFALSE(incomparables))
.NotYetUsed("incomparables != FALSE")
if (length(x) != 1L) {
if (any(i <- vapply(x, is.factor, NA)))
x[i] <- lapply(x[i], as.numeric)
duplicated(do.call(Map, `names<-`(c(list, x), NULL)),
fromLast = fromLast)
}
else duplicated(x[[1L]], fromLast = fromLast, ...)
}
<bytecode: 0x15b44f0f0>
<environment: namespace:base>
That vapply
call is completely avoidable: you should already know whether chr
and pos
are factors. I would suggest defining a replacement for duplicated.data.frame
that does only what is necessary given your use case. For example, if you know that chr
and pos
are not factors, then you might assign
duped <- function(x) {
duplicated.default(do.call(Map, `names<-`(c(list, x), NULL)))
}
and compute sum(duped(x))
instead of sum(duplicated(x))
. In fact, you could do slightly better by replacing list
with c
:
fastduped <- function(x) {
duplicated.default(do.call(Map, `names<-`(c(c, x), NULL)))
}
Using c
here causes rows of the data frame x
to be stored and compared as atomic vectors rather than as lists. In other words, fastduped(x)
is doing
duplicated.default(<length-'m' list of length-'n' atomic vectors>)
whereas duped(x)
is doing
duplicated.default(<length-'m' list of length-'n' lists of length-1 atomic vectors>)
where m = nrow(x)
and n = length(x)
. The latter is slower and consumes more memory, and there is a warning in ?duplicated
saying as much:
Using this for lists is potentially slow, especially if the elements are not atomic vectors (see ‘vector’) or differ only in their attributes. In the worst case it is O(n^2).
Computing sum(fastduped(x))
instead of sum(duplicated(x))
should increase the number of data frames that you can process simultaneously without running out of memory. FWIW, here is a benchmark comparing the run times of duplicated
, duped
, fastduped
(saying nothing about memory usage):
set.seed(1L)
x <- as.data.frame(replicate(2L, sample.int(100L, size = 1e+06L, replace = TRUE), simplify = FALSE))
microbenchmark::microbenchmark(duplicated(x), duped(x), fastduped(x), times = 1000L)
Unit: milliseconds
expr min lq mean median uq max neval
duplicated(x) 521.7263 598.9353 688.7286 628.8813 769.6100 1324.458 1000
duped(x) 521.3863 598.7390 682.1298 627.1445 764.7331 1373.712 1000
fastduped(x) 431.0359 528.6613 594.1534 553.7739 609.6241 1123.542 1000