0

I have a list of ffdf, it takes up about 76GB of RAM if it is loaded to RAM instead of using ff package. The following is their respective dim()

> ffdfs |> sapply(dim)
         [,1]     [,2]     [,3]      [,4]      [,5]      [,6]      [,7]
[1,] 11478746 12854627 10398332 404567958 490530023 540375993 913792256
[2,]        3        3        3         3         3         3         3
         [,8]     [,9]     [,10]     [,11]    [,12]     [,13]     [,14]
[1,] 15296863 11588739 547337574 306972654 11544523 255644408 556900805
[2,]        3        3         3         3        3         3         3
        [,15]     [,16]    [,17]
[1,] 13409223 900436690 15184264
[2,]        3         3        3

I want to check the number of duplication in each ffdf, so I did the following:

check_duplication <- sample_cols |> sapply(function(df) {
    df[c("chr","pos")] |> duplicated() |> sum()
})

It works but it is extremely slow.

I am on a HPC, I have about 110GB RAM and 18CPU.

Will there be any other option or setting I could adjust to speed up the process? Thank you.

Chris LAM
  • 142
  • 1
  • 7
  • Have you read `vignette("parallel", "parallel")`? You could relatively easily parallelize the `sapply` call. – Mikael Jagan Dec 31 '21 at 02:35
  • 1
    Given 110GB RAM, you could actually afford to load everything into the memory? Then why not convert everything into `data.table`s and then do those computations? On my laptop with a smaller dataset (1e6 rows), the calculation is roughly 30 times faster after the conversion. – ekoam Dec 31 '21 at 03:34
  • @ekoam the I actually tried to load everything to RAM, but when I try to compute `check_duplication` the process takes so so so long and the whole R console gets `Killed` by the Linux system.. I am not sure why is it, but I suspect it is a memory issue, so I switched to `ff`, which at least returns a result, after quite a few hours. – Chris LAM Dec 31 '21 at 04:05
  • @MikaelJagan I did try `parallel::mclapply` but it come up with another issue to me: it does not return every element result. the error is `Warning message: In mccollect(jobs) : 4 parallel jobs did not deliver results` – Chris LAM Dec 31 '21 at 05:06
  • Have a look at [my benchmark](https://stackoverflow.com/questions/42393658/lapply-vs-for-loop-performance-r/70023363#70023363) - `sapply` is more memory intensive than alternatives. Doesn't scale as well as alternatives. – Donald Seinen Dec 31 '21 at 05:09
  • @ChrisLAM Have you determined what is causing those 4 jobs to fail? – Mikael Jagan Dec 31 '21 at 05:40
  • @MikaelJagan No, I cannot entirely sure if it is about memory, but here is my reference: https://stackoverflow.com/questions/3426356/what-causes-an-r-script-to-get-killed which is the same error message I have got: `Killed` – Chris LAM Dec 31 '21 at 06:09

1 Answers1

2

Parallelization is a natural way to speed this up. It can be done at C level via data.table:

library("data.table")
data.table 1.14.2 using 4 threads (see ?getDTthreads).  Latest news: r-datatable.com
set.seed(1L)
x <- as.data.frame(replicate(2L, sample.int(100L, size = 1e+06L, replace = TRUE), simplify = FALSE))
y <- as.data.table(x)
microbenchmark::microbenchmark(duplicated(x), duplicated(y), times = 1000L)
Unit: milliseconds
          expr       min         lq       mean     median         uq       max neval
 duplicated(x) 449.27693 596.242890 622.160423 625.610267 644.682319 734.39741  1000
 duplicated(y)   5.75722   6.347518   7.413925   6.874593   7.407695  58.12131  1000

The benchmark here shows that duplicated is much faster when applied to a data.table instead of an equivalent data frame. Of course, how much faster depends on the number of CPUs that you make available to data.table (see ?setDTthreads).

If you go the data.table route, then you would process your 17 data frames like so:

nduped <- function(ffd) {
  x <- as.data.frame(ffd[c("chr", "pos")])
  setDT(x)
  n <- sum(duplicated(x))
  rm(x)
  gc(FALSE)
  n
}
vapply(list_of_ffd, nduped, 0L)

Here, we are using setDT rather than as.data.table to perform an in-place coercion from data frame to data.table, and we are using rm and gc to free the memory occupied by x before reading another data frame into memory.

If, for whatever reason, data.table is not an option, then you can stick to using the duplicated method for data frames, namely duplicated.data.frame. It is not parallelized at C level, so you would need to parallelize at R level, using, e.g., mclapply to assign your 17 data frames to batches and process those batches in parallel:

nduped <- function(ffd) {
  x <- as.data.frame(ffd[c("chr", "pos")])
  n <- sum(duplicated(x))
  rm(x)
  gc(FALSE)
  n
}
unlist(parallel::mclapply(list_of_ffd, nduped, ...))

This option is slower and consumes more memory than you might expect. Fortunately, there is room for optimization. The rest of this answer highlights some of the main issues and ways to get around them. Feel free to stop reading if you've already settled on data.table.

  • Since you have 18 CPUs, you can try to process all 17 data frames simultaneously, but you might encounter out-of-memory issues as a result of reading all 17 data frames into memory at once. Increasing the batch size (i.e., distributing the 17 jobs across fewer than 17 CPUs) should help.

  • Since your 17 data frames vary widely in length (number of rows), randomly assigning them to roughly equally sized batches is probably not a good strategy. You could decrease the overall run time by batching shorter data frames together and not batching longer data frames together. mclapply has an affinity.list argument giving you this control. Ideally, each batch should require the same amount of processing time.

  • The amount of memory that each job uses is actually at least two times greater than the amount needed to store the data frame x, because duplicated.data.frame copies its argument:

    x <- data.frame(chr = rep(1:2, times = 5L), pos = rep(1:2, each = 5L))
    tracemem(x)
    
    [1] "<0x14babad48>"
    
    invisible(duplicated(x))
    
    tracemem[0x14babad48 -> 0x14babc088]: as.list.data.frame as.list vapply duplicated.data.frame duplicated
    

    The copy happens inside of the vapply call in the body of the method:

    duplicated.data.frame
    
    function (x, incomparables = FALSE, fromLast = FALSE, ...) 
    {
        if (!isFALSE(incomparables)) 
            .NotYetUsed("incomparables != FALSE")
        if (length(x) != 1L) {
            if (any(i <- vapply(x, is.factor, NA))) 
                x[i] <- lapply(x[i], as.numeric)
            duplicated(do.call(Map, `names<-`(c(list, x), NULL)), 
                fromLast = fromLast)
        }
        else duplicated(x[[1L]], fromLast = fromLast, ...)
    }
    <bytecode: 0x15b44f0f0>
    <environment: namespace:base>
    

    That vapply call is completely avoidable: you should already know whether chr and pos are factors. I would suggest defining a replacement for duplicated.data.frame that does only what is necessary given your use case. For example, if you know that chr and pos are not factors, then you might assign

    duped <- function(x) {
      duplicated.default(do.call(Map, `names<-`(c(list, x), NULL)))
    }
    

    and compute sum(duped(x)) instead of sum(duplicated(x)). In fact, you could do slightly better by replacing list with c:

    fastduped <- function(x) {
      duplicated.default(do.call(Map, `names<-`(c(c, x), NULL)))
    }
    

    Using c here causes rows of the data frame x to be stored and compared as atomic vectors rather than as lists. In other words, fastduped(x) is doing

    duplicated.default(<length-'m' list of length-'n' atomic vectors>)
    

    whereas duped(x) is doing

    duplicated.default(<length-'m' list of length-'n' lists of length-1 atomic vectors>)
    

    where m = nrow(x) and n = length(x). The latter is slower and consumes more memory, and there is a warning in ?duplicated saying as much:

    Using this for lists is potentially slow, especially if the elements are not atomic vectors (see ‘vector’) or differ only in their attributes. In the worst case it is O(n^2).

    Computing sum(fastduped(x)) instead of sum(duplicated(x)) should increase the number of data frames that you can process simultaneously without running out of memory. FWIW, here is a benchmark comparing the run times of duplicated, duped, fastduped (saying nothing about memory usage):

    set.seed(1L)
    x <- as.data.frame(replicate(2L, sample.int(100L, size = 1e+06L, replace = TRUE), simplify = FALSE))
    microbenchmark::microbenchmark(duplicated(x), duped(x), fastduped(x), times = 1000L)
    
    Unit: milliseconds
              expr      min       lq     mean   median       uq      max neval
    duplicated(x) 521.7263 598.9353 688.7286 628.8813 769.6100 1324.458  1000
          duped(x) 521.3863 598.7390 682.1298 627.1445 764.7331 1373.712  1000
      fastduped(x) 431.0359 528.6613 594.1534 553.7739 609.6241 1123.542  1000
    
Mikael Jagan
  • 9,012
  • 2
  • 17
  • 48