39

I have a simulation that has a huge aggregate and combine step right in the middle. I prototyped this process using plyr's ddply() function which works great for a huge percentage of my needs. But I need this aggregation step to be faster since I have to run 10K simulations. I'm already scaling the simulations in parallel but if this one step were faster I could greatly decrease the number of nodes I need.

Here's a reasonable simplification of what I am trying to do:

library(Hmisc)

# Set up some example data
year <-    sample(1970:2008, 1e6, rep=T)
state <-   sample(1:50, 1e6, rep=T)
group1 <-  sample(1:6, 1e6, rep=T)
group2 <-  sample(1:3, 1e6, rep=T)
myFact <-  rnorm(100, 15, 1e6)
weights <- rnorm(1e6)
myDF <- data.frame(year, state, group1, group2, myFact, weights)

# this is the step I want to make faster
system.time(aggregateDF <- ddply(myDF, c("year", "state", "group1", "group2"),
                     function(df) wtd.mean(df$myFact, weights=df$weights)
                                 )
           )

All tips or suggestions are appreciated!

Matt Dowle
  • 58,872
  • 22
  • 166
  • 224
JD Long
  • 59,675
  • 58
  • 202
  • 294

6 Answers6

37

Instead of the normal R data frame, you can use a immutable data frame which returns pointers to the original when you subset and can be much faster:

idf <- idata.frame(myDF)
system.time(aggregateDF <- ddply(idf, c("year", "state", "group1", "group2"),
   function(df) wtd.mean(df$myFact, weights=df$weights)))

#    user  system elapsed 
# 18.032   0.416  19.250 

If I was to write a plyr function customised exactly to this situation, I'd do something like this:

system.time({
  ids <- id(myDF[c("year", "state", "group1", "group2")], drop = TRUE)
  data <- as.matrix(myDF[c("myFact", "weights")])
  indices <- plyr:::split_indices(seq_len(nrow(data)), ids, n = attr(ids, "n"))

  fun <- function(rows) {
    weighted.mean(data[rows, 1], data[rows, 2])
  }
  values <- vapply(indices, fun, numeric(1))

  labels <- myDF[match(seq_len(attr(ids, "n")), ids), 
    c("year", "state", "group1", "group2")]
  aggregateDF <- cbind(labels, values)
})

# user  system elapsed 
# 2.04    0.29    2.33 

It's so much faster because it avoids copying the data, only extracting the subset needed for each computation when it's computed. Switching the data to matrix form gives another speed boost because matrix subsetting is much faster than data frame subsetting.

JD Long
  • 59,675
  • 58
  • 202
  • 294
hadley
  • 102,019
  • 32
  • 183
  • 245
  • 5
    `idata.frame` was added in plyr 1.0. – hadley Sep 10 '10 at 15:32
  • I had messed around with making indexes and such with data.table and had all but given up on that idea. I was hoping for 50% improvement. This far exceeds my expectations. – JD Long Sep 10 '10 at 16:13
  • having a little trouble making this run right... But I'm learning as I go... I had changed data to myDF but not sure where the issue is.. – JD Long Sep 10 '10 at 16:30
  • the code above seems to be missing the creation of the matrix 'data' (if I'm reading this right) possibly a data <- as.matrix(myDF[5:6]) at the top? – JD Long Sep 10 '10 at 16:55
  • I know this is from 4 years ago so it's a longshot, but have this question and now the `indices` code doesn't seem to run. Anyone else have this issue? Has updates to plyr changed how `plyr:::split_indices` works? Getting this error `Error in plyr:::split_indices(seq_len(nrow(data)), ids, n = attr(ids, : unused argument (ids)` – lamecicle Nov 05 '14 at 12:11
  • 1
    @lamecicle you're much better off using dplyr now – hadley Nov 05 '14 at 15:01
  • @hadley Thanks for that, finding it hard to keep up with the latest methods for data management and manipulation, any suggestions of a good form or something? – lamecicle Nov 05 '14 at 17:52
  • @lamecicle #rstats hashtag on twitter maybe – hadley Nov 05 '14 at 19:42
28

Further 2x speedup and more concise code:

library(data.table)
dtb <- data.table(myDF, key="year,state,group1,group2")
system.time( 
  res <- dtb[, weighted.mean(myFact, weights), by=list(year, state, group1, group2)] 
)
#   user  system elapsed 
#  0.950   0.050   1.007 

My first post, so please be nice ;)


From data.table v1.9.2, setDT function is exported that'll convert data.frame to data.table by reference (in keeping with data.table parlance - all set* functions modify the object by reference). This means, no unnecessary copying, and is therefore fast. You can time it, but it'll be negligent.

require(data.table)
system.time({
  setDT(myDF)
  res <- myDF[, weighted.mean(myFact, weights), 
             by=list(year, state, group1, group2)] 
})
#   user  system elapsed 
#  0.970   0.024   1.015 

This is as opposed to 1.264 seconds with OP's solution above, where data.table(.) is used to create dtb.

Arun
  • 116,683
  • 26
  • 284
  • 387
datasmurf
  • 1,383
  • 2
  • 10
  • 7
  • Good post! Thanks for the answer. To be consistent with the other methods, however, the step that creates the data table and index should be inside of the system.time() step. – JD Long Oct 29 '10 at 22:20
  • 2
    Indeed, but it remains the fastest though. It would be nice to have an option in ddply to operate on data.tables or use data.tables under the hood (I just discovered data.table by looking for solutions to the very same problem, but I would prefer a more ddply-like syntax for this case). – datasmurf Oct 30 '10 at 20:49
10

I would profile with base R

g <- with(myDF, paste(year, state, group1, group2))
x <- with(myDF, c(tapply(weights * myFact, g, sum) / tapply(weights, g, sum)))
aggregateDF <- myDF[match(names(x), g), c("year", "state", "group1", "group2")]
aggregateDF$V1 <- x

On my machine it takes 5sec compare to 67sec with original code.

EDIT Just found another speed up with rowsum function:

g <- with(myDF, paste(year, state, group1, group2))
X <- with(myDF, rowsum(data.frame(a=weights*myFact, b=weights), g))
x <- X$a/X$b
aggregateDF2 <- myDF[match(rownames(X), g), c("year", "state", "group1", "group2")]
aggregateDF2$V1 <- x

It takes 3sec!

Marek
  • 49,472
  • 15
  • 99
  • 121
  • 2
    Second one takes 5 seconds on my computer, so plyr is still narrowly beating base ;) (Plus it orders the rows correctly) – hadley Sep 10 '10 at 16:33
  • 2
    But thanks for the pointer to `rowsum` - it's so hard to keep up with the plethora of aggregation functions in base R. – hadley Sep 10 '10 at 16:34
  • I knew there had to be a tapply way of doing this as well but I was struggling to figure it out. I generally have this struggle with the apply family. – JD Long Sep 10 '10 at 16:36
  • @hadley Agree. Some time ago I found replacement for `apply(X,1,which.max)` in `col.max`. I wonder how many other functions this type exists (like `pmax`/`pmin`), optimized for matrices objects by using `.Internal` level. – Marek Sep 13 '10 at 08:17
  • 1
    @Marek: See http://4dpiecharts.com/2010/09/14/which-functions-in-r-base-call-internal-code/ – Richie Cotton Sep 15 '10 at 09:41
  • @Richie It's exactly what I plan to do in the weekend :) And the simplicity of this code is outstanding – Marek Sep 15 '10 at 10:43
7

Are you using the latest version of plyr (note: this hasn't made it to all the CRAN mirrors yet)? If so, you could just run this in parallel.

Here's the llply example, but the same should apply to ddply:

  x <- seq_len(20)
  wait <- function(i) Sys.sleep(0.1)
  system.time(llply(x, wait))
  #  user  system elapsed 
  # 0.007   0.005   2.005 

  library(doMC)
  registerDoMC(2) 
  system.time(llply(x, wait, .parallel = TRUE))
  #  user  system elapsed 
  # 0.020   0.011   1.038 

Edit:

Well, other looping approaches are worse, so this probably requires either (a) C/C++ code or (b) a more fundamental rethinking of how you're doing it. I didn't even try using by() because that's very slow in my experience.

groups <- unique(myDF[,c("year", "state", "group1", "group2")])
system.time(
aggregateDF <- do.call("rbind", lapply(1:nrow(groups), function(i) {
   df.tmp <- myDF[myDF$year==groups[i,"year"] & myDF$state==groups[i,"state"] & myDF$group1==groups[i,"group1"] & myDF$group2==groups[i,"group2"],]
   cbind(groups[i,], wtd.mean(df.tmp$myFact, weights=df.tmp$weights))
}))
)

aggregateDF <- data.frame()
system.time(
for(i in 1:nrow(groups)) {
   df.tmp <- myDF[myDF$year==groups[i,"year"] & myDF$state==groups[i,"state"] & myDF$group1==groups[i,"group1"] & myDF$group2==groups[i,"group2"],]
   aggregateDF <- rbind(aggregateDF, data.frame(cbind(groups[i,], wtd.mean(df.tmp$myFact, weights=df.tmp$weights))))
}
)
Shane
  • 98,550
  • 35
  • 224
  • 217
  • that helps me in the single machine case but I'm already blowing this out to Hadoop & oversubscribing each node (more processes than processors). But I'm very pleased to see parallelization making it into plyr! – JD Long Sep 10 '10 at 14:55
5

I usually use an index vector with tapply when the function being applied has multiple vector args:

system.time(tapply(1:nrow(myDF), myDF[c('year', 'state', 'group1', 'group2')], function(s) weighted.mean(myDF$myFact[s], myDF$weights[s])))
# user  system elapsed 
# 1.36    0.08    1.44 

I use a simple wrapper which is equivalent but hides the mess:

tmapply(list(myDF$myFact, myDF$weights), myDF[c('year', 'state', 'group1', 'group2')], weighted.mean)

Edited to include tmapply for comment below:

tmapply = function(XS, INDEX, FUN, ..., simplify=T) {
  FUN = match.fun(FUN)
  if (!is.list(XS))
    XS = list(XS)
  tapply(1:length(XS[[1L]]), INDEX, function(s, ...)
    do.call(FUN, c(lapply(XS, `[`, s), list(...))), ..., simplify=simplify)
}
Charles
  • 4,389
  • 2
  • 16
  • 13
  • 1
    Just to add: `as.data.frame(as.table(RESULTS))` it's easy way to create `data.frame` from output. – Marek Sep 15 '10 at 08:37
  • Is this the `tmapply` that you're using? https://stat.ethz.ch/pipermail/r-help/2002-October/025773.html – Shane Dec 29 '10 at 15:22
  • There the `m` is being used to refer to "matrix". In my case it was meant more as a mashup of `tapply` and `mapply` - ie when you want to calculate a grouped aggregate using multiple inputs (cor, weighted.mean etc). – Charles Dec 29 '10 at 19:25
1

Probably the fastest solution is to use collapse::fgroup_by. It's 8x faster than data.table:

library(collapse)
myDF %>% 
  fgroup_by(year, state, group1, group2) %>% 
  fsummarise(myFact = fmean(myFact, weights))

bm <- bench::mark(
  collapse = myDF %>% 
  fgroup_by(year, state, group1, group2) %>% 
  fsummarise(myFact = fmean(myFact, weights)),
  data.table = dtb[, weighted.mean(myFact, weights), by=list(year, state, group1, group2)],
  check = FALSE)

#> bm
#  expression      min   median itr/se…¹ mem_a…² gc/se…³ n_itr  n_gc total…⁴
#1 collapse      101ms    105ms     9.10  8.84MB    0        5     0   549ms
#2 data.table    852ms    852ms     1.17 24.22MB    2.35     1     2   852ms
Maël
  • 45,206
  • 3
  • 29
  • 67