12

I'm trying to move from a serial to parallel approach to accomplish some multivariate time series analysis tasks on a large data.table. The table contains data for many different groups and I'm trying to move from a for loop to a foreach loop using the doParallel package to take advantage of the multicore processor installed.

The problem I am experiencing relates to memory and how the new R processes seem to consume large quantities of it. I think that what is happening is that the large data.table containing ALL data is copied into each new process, hence I run out of RAM and Windows starts swapping to disk.

I've created a simplified reproducible example which replicates my problem, but with less data and less analysis inside the loop. It would be ideal if a solution existed which could only farm out the data to the worker processes on demand, or sharing the memory already used between cores. Alternatively some kind of solution may already exist to split the big data into 4 chunks and pass these to the cores so they have a subset to work with.

A similar question has previously been posted here on Stackoverflow however I cannot make use of the bigmemory solution offered as my data contains a character field. I will look further into the iterators package, however I'd appreciate any suggestions from members with experience of this problem in practice.

rm(list=ls())
library(data.table)
num.series = 40    # can customise the size of the problem (x10 eats my RAM)
num.periods = 200  # can customise the size of the problem (x10 eats my RAM)
dt.all = data.table(
    grp     = rep(1:num.series,each=num.periods), 
    pd      = rep(1:num.periods, num.series), 
    y       = rnorm(num.series * num.periods),
    x1      = rnorm(num.series * num.periods),
    x2      = rnorm(num.series * num.periods)
) 
dt.all[,y_lag := c(NA, head(y, -1)), by = c("grp")]

f_lm = function(dt.sub, grp) {
    my.model = lm("y ~ y_lag + x1 + x2 ", data = dt.sub)
    coef = summary(my.model)$coefficients
    data.table(grp, variable = rownames(coef), coef)
}

library(doParallel)  
registerDoParallel(4) 
foreach(grp=unique(dt.all$grp), .packages="data.table", .combine="rbind")  %dopar%
{
    dt.sub = dt.all[grp == grp]
    f_lm(dt.sub, grp)
}
detach(package:doParallel)
Community
  • 1
  • 1
Matt Weller
  • 2,684
  • 2
  • 21
  • 30
  • 1
    The statement dt.sub = dt.all[grp == grp] is going to cause a lot of copies of dt.sub to be floating around. Have you tried using the subset= argument of lm in your lm call to avoid having to do this? Also I'm wondering what is done with the return value of f_lm when it is calculated, it isn't assigned to anything. – James King Dec 28 '13 at 02:01
  • Yes this example is flawed. In reality I have to narrow down the possible formula for the upper and lower bound for `stepAIC`, filter out rows with `NA`s, obtain the best model, record coefficients, calculate elasticities and in-sample accuracy etc. So there are a few steps in addition to just running `lm` and awkwardly I will be returning a list of `data.tables` to `foreach`. I will certainly give `subset` some consideration, thanks for the suggestion. I can handle copies of `dt.sub`, it's just `dt.all` that which can't have many copies in memory. – Matt Weller Dec 28 '13 at 02:22
  • I'm surprised that multiple copies of dt.all would be created. According to the documentation for multicore "Modern operating systems use copy-on-write approach which makes this so appealing for parallel computation since only objects modified during the computation will be actually copied and all other memory is directly shared." I know dt.all would not reside in memory more than once on linux. Not sure about windows. – James King Dec 28 '13 at 03:05
  • 1
    doParallel: "The multicore functionality supports multiple workers only on those operating systems that support the fork system call; this excludes Windows. By default, doParallel uses multicore functionality on Unix-like systems and snow functionality on Windows". I don't have documented evidence as to whether the data in the environment is copied but given that the packages required must be specified and I presume loaded into each instance of R then so would the data be required. I guess it's a Windows thing. – Matt Weller Dec 28 '13 at 03:49
  • Yes. I have been told this functionality works better on Unix (because of the fork command) but switching may not be an option for you. – James King Dec 28 '13 at 03:51
  • Luckily it is an option as we have a Sun HPC cluster at the University but given my Unix knowledge and lack of experience of MPI/parallel I hoped to get things working on a smaller scale on the 64-bit Windows laptop first. – Matt Weller Dec 28 '13 at 04:01

3 Answers3

5

Iterators can help to reduce the amount of memory that needs to be passed to the workers of a parallel program. Since you're using the data.table package, it's a good idea to use iterators and combine functions that are optimized for data.table objects. For example, here is a function like isplit that works on data.table objects:

isplitDT <- function(x, colname, vals) {
  colname <- as.name(colname)
  ival <- iter(vals)
  nextEl <- function() {
    val <- nextElem(ival)
    list(value=eval(bquote(x[.(colname) == .(val)])), key=val)
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

Note that it isn't completely compatible with isplit, since the arguments and return value are slightly different. There may also be a better way to subset the data.table, but I think this is more efficient than using isplit.

Here is your example using isplitDT and a combine function that uses rbindlist which combines data.tables faster than rbind:

dtcomb <- function(...) {
  rbindlist(list(...))
}

results <- 
  foreach(dt.sub=isplitDT(dt.all, 'grp', unique(dt.all$grp)),
          .combine='dtcomb', .multicombine=TRUE,
          .packages='data.table') %dopar% {
    f_lm(dt.sub$value, dt.sub$key)
  }

Update

I wrote a new iterator function called isplitDT2 which performs much better than isplitDT but requires that the data.table have a key:

isplitDT2 <- function(x, vals) {
  ival <- iter(vals)
  nextEl <- function() {
    val <- nextElem(ival)
    list(value=x[val], key=val)
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

This is called as:

setkey(dt.all, grp)
results <-
  foreach(dt.sub=isplitDT2(dt.all, levels(dt.all$grp)),
          .combine='dtcomb', .multicombine=TRUE,
          .packages='data.table') %dopar% {
    f_lm(dt.sub$value, dt.sub$key)
  }

This uses a binary search to subset dt.all rather than a vector scan, and so is more efficient. I don't know why isplitDT would use more memory, however. Since you're using doParallel, which doesn't call the iterator on-the-fly as it sends out tasks, you might want to experiment with splitting dt.all and then removing it to reduce your memory usage:

dt.split <- as.list(isplitDT2(dt.all, levels(dt.all$grp)))
rm(dt.all)
gc()
results <- 
  foreach(dt.sub=dt.split,
          .combine='dtcomb', .multicombine=TRUE,
          .packages='data.table') %dopar% {
    f_lm(dt.sub$value, dt.sub$key)
  }

This may help by reducing the amount of memory needed by the master process during the execution of the foreach loop, while still only sending the required data to the workers. If you still have memory problems, you could also try using doMPI or doRedis, both of which get iterator values as needed, rather than all at once, making them more memory efficient.

Steve Weston
  • 19,197
  • 4
  • 59
  • 75
  • Nice solution Steve, especially the combination function and the generic nature of the solution, thanks. Unfortunately I don't get to accept my own answer as this one is clearly better! – Matt Weller Dec 28 '13 at 22:20
  • I tried using this approach on my real data and it caused the laptop to run out of memory. Same issue on the cluster with 10GB RAM. So I tested both solutions against the original problem specification and was surprised to find that the `system.time` comparison was 49s for `isplitDT` with `dtcomb` against 14.76s for the basic `isplit` with `rbind`. So I ran 4 tests to evaluate the combinations of `rbind`/`rbindlist` x `isplit`/`isplitDT`. This pointed to the slowdown being due to `isplitDT` as opposed to `rbindlist`. Will post test code if reqd. Any thoughts? – Matt Weller Dec 30 '13 at 01:00
4

The answer requires the iterators package and use of isplit which is similar to split in that it breaks the main data object into chunks based on one or more factor columns. The foreach loop iterates through the chunks of data, passing only the subset out to the worker process rather than the whole table.

So the differences in the code are as follows:

library(iterators)
dt.all = data.table(
    grp     = factor(rep(1:num.series, each  =num.periods)),  # grp column is a factor
    pd      = rep(1:num.periods, num.series), 
    y       = rnorm(num.series * num.periods),
    x1      = rnorm(num.series * num.periods),
    x2      = rnorm(num.series * num.periods)
) 

results = 
    foreach(dt.sub = isplit(dt.all, dt.all$grp), .packages="data.table", .combine="rbind")  
    %dopar%
    {
        f_lm(dt.sub$value, dt.sub$key[[1]])
    }

The result of the isplit is that dt.sub is now a list with 2 elements: the key is in itself a list of the values used to split and the value contains the subset as a data.table.

Credit for this solution is given to a SO answer given by David and a response by Russell to my question on an excellent blog post about iterators.

------------------------------------ EDIT ------------------------------------

To test the performance of isplitDT v isplit and rbindlist v rbind the following code was used:

rm(list=ls())
library(data.table) ; library(iterators)  ;   library(doParallel)
num.series = 400
num.periods = 2000
dt.all = data.table(
    grp     = factor(rep(1:num.series,each=num.periods)), 
    pd      = rep(1:num.periods, num.series), 
    y       = rnorm(num.series * num.periods),
    x1      = rnorm(num.series * num.periods),
    x2      = rnorm(num.series * num.periods)
)
dt.all[,y_lag := c(NA, head(y, -1)), by = c("grp")]

f_lm = function(dt.sub, grp) {
    my.model = lm("y ~ y_lag + x1 + x2 ", data = dt.sub)
    coef = summary(my.model)$coefficients
    data.table(grp, variable = rownames(coef), coef)
}

registerDoParallel(8)

isplitDT <- function(x, colname, vals) {
  colname <- as.name(colname)
  ival <- iter(vals)
  nextEl <- function() {
    val <- nextElem(ival)
    list(value=eval(bquote(x[.(colname) == .(val)])), key=val)
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

dtcomb <- function(...) {
  rbindlist(list(...))
}

# isplit/rbind
st1 = system.time(results <- foreach(dt.sub=isplit(dt.all,dt.all$grp),   
                    .combine="rbind",
                    .packages="data.table")  %dopar% {
    f_lm(dt.sub$value, dt.sub$key[[1]])
})
# isplit/rbindlist
st2 = system.time(results <- foreach(dt.sub=isplit(dt.all,dt.all$grp),  
                .combine='dtcomb', .multicombine=TRUE,
                .packages="data.table") %dopar% {
    f_lm(dt.sub$value, dt.sub$key[[1]])
})
# isplitDT/rbind
st3 = system.time(results <- foreach(dt.sub=isplitDT(dt.all, 'grp',     unique(dt.all$grp)),
            .combine='dtcomb', .multicombine=TRUE,
            .packages='data.table') %dopar% {
    f_lm(dt.sub$value, dt.sub$key)
})
# isplitDT/rbindlist
st4 = system.time(results <- foreach(dt.sub=isplitDT(dt.all, 'grp', unique(dt.all$grp)),
                .combine='dtcomb', .multicombine=TRUE,
                .packages='data.table') %dopar% {
    f_lm(dt.sub$value, dt.sub$key)
  })

rbind(st1, st2, st3, st4)

This gives the following timings:

    user.self sys.self elapsed user.child sys.child
st1     12.08     1.53   14.66         NA        NA
st2     12.05     1.41   14.08         NA        NA
st3     45.33     2.40   48.14         NA        NA
st4     45.00     3.30   48.70         NA        NA

------------------------------------ EDIT 2 ------------------------------------

Thanks to Steve's updated answer and the function isplitDT2, which makes use of the keys on the data.table, we have a clear new winner in terms of speed. Running microbenchmark to compare my original solution (in this answer) shows around 7-fold improvement from isplitDT2 with rbindlist. Memory usage has not yet been compared directly but the performance gain leads me to accept the answer at last.

Community
  • 1
  • 1
Matt Weller
  • 2,684
  • 2
  • 21
  • 30
  • Did isplitDB2 from my updated answer do better either in speed or memory use? – Steve Weston Jan 02 '14 at 14:18
  • @SteveWeston: thanks for the update which performs considerably better on the toy example in the question. I'll try it now on the real dataset and assess memory usage too. – Matt Weller Jan 03 '14 at 01:58
2

Holding everything in memory is one of those (aargh, annoying) things that R programmers have to learn to deal with. It's pretty easy to imagine your code example as either memory-bound or CPU-bound, and you'll need to figure that out before trying to apply workarounds.

Assuming the memory is being consumed by your dataset (dt_all) and not during the actual model run, it is possible you might be able to release enough memory for the worker processes to parallelize:

foreach(grp=unique(dt.all$grp), .packages="data.table", .combine="rbind")  %dopar%
{
    dt.sub = dt.all[grp == grp]
    rm(dt.all)
    gc()
    f_lm(dt.sub, grp)
}

However, this assumes that your working set (dt.sub) is small enough that you can fit more than one of them in memory at a time. It isn't hard to imagine a problem set too large for that. Also, and this is really annoying, all the workers are going to fire up at one time and kill your machine anyway, so you might need to make them pause for a couple seconds to allow other children to load up and release memory.

Though desperately stupid and brute-force, I have handled this exact problem by writing the subsets out to disk as individual data files, and then used a batch script to run my computations in parallel.

khoxsey
  • 1,405
  • 9
  • 13
  • Very good point regarding deleting the `dt.all` and the phasing of child loads! Luckily my `dt.sub` isn't so huge (313 x 100) as to consume tonnes of memory. Writing to disk would be an option as you suggest, I'll test it tomorrow. Currently I'm toying with parallel computing on my laptop, in production I'll be able to use the University HPC so should be able to use more memory and nodes when I understand what I'm doing! – Matt Weller Dec 28 '13 at 02:33
  • If your working set is manageable, your major issue is likely to be the trade-off between time to create the subset and time to run the model. That trade-off will likely dictate your approach. GL! – khoxsey Dec 28 '13 at 02:37
  • Removing dt.all will cause an error if the number of tasks is greater than the number of workers, since tasks will start to fail when any worker executes a subsequent task. This is an even bigger problem when using the default doSEQ backend, since the master's copy is removed after executing the first task. – Steve Weston Dec 28 '13 at 23:24