Following this post: multicore and data.table in R, I was wondering if there was a way to use all cores when using data.table, typically doing calculations by groups could be parallelized. It seems that plyr
allows such operations by design.
3 Answers
First thing to check is that data.table
FAQ 3.1 point 2 has sunk in :
One memory allocation is made for the largest group only, then that memory is reused for the other groups. There is very little garbage to collect.
That's one reason data.table grouping is quick. But this approach doesn't lend itself to parallelization. Parallelizing means copying the data to the other threads, instead, costing time. But, my understanding is that data.table
grouping is usually faster than plyr
with .parallel
on anyway. It depends on the computation time of the task for each group, and if that compute time can be easily reduced or not. Moving the data around often dominates (when benchmarking 1 or 3 runs of large data tasks).
More often, so far, it's actually some gotcha that's biting in the j
expression of [.data.table
. For example, recently we saw poor performance from data.table
grouping but the culprit turned out to be min(POSIXct)
(Aggregating in R over 80K unique ID's). Avoiding that gotcha yielded over 50 times speedup.
So the mantra is: Rprof
, Rprof
, Rprof
.
Further, point 1 from the same FAQ might be significant :
Only that column is grouped, the other 19 are ignored because data.table inspects the j expression and realises it doesn’t use the other columns.
So, data.table
really doesn't follow the split-apply-combine paradigm at all. It works differently. split-apply-combine lends itself to parallelization but it really doesn't scale to large data.
Also see footnote 3 in the data.table intro vignette :
We wonder how many people are deploying parallel techniques to code that is vector scanning
That's trying to say "sure, parallel is significantly faster, but how long should it really take with an efficient algorithm?".
BUT if you've profiled (using Rprof
), and the task per group really is compute intensive, then the 3 posts on datatable-help including the word "multicore" might help:
multicore posts on datatable-help
Of course there are many tasks where parallelization would be nice in data.table, and there is a way to do it. But it hasn't been done yet, since usually other factors bite, so it's been low priority. If you can post reproducible dummy data with benchmarks and Rprof results, that would help increase the priority.

- 1
- 1

- 58,872
- 22
- 166
- 224
-
1Thanks for this clear picture, the data.table help posts are also very helpful. One thing I found parallel very helpful in conjunction with data.table is loading files in parallel... FYI: I use POSIXct extensively with your data.table... there are many idioms I avoid, typically taking min/max, I might list if you are interested... (for instead I never use your `last` function but `tail`...). About loading string to POSIXct I'll try `fasttime` from Simon Urbanek (again.) soon as it looks MUCH quicker. I think there is a lot to gain looking closely on POSIXct+data.table or even coding your own. – statquant Feb 08 '13 at 11:29
-
1@statquant Thanks, yes we're interested in these comments. Feel more than free to add sections to the [data.table wiki](http://rwiki.sciviews.org/doku.php?id=packages:cran:data.table) perhaps? Seems like you have some good tips. – Matt Dowle Feb 08 '13 at 11:59
-
2It might be useful to note the points made [here](http://stackoverflow.com/a/9808657/1385941), especially the fact that the splitting occurs on a single core, and then the function applied to the data in parallel -- some serious overheads! – mnel Feb 11 '13 at 22:53
-
Could someone explain or point to some reference for "So the mantra is: Rprof, Rprof, Rprof." - how would that help me. I ran into this post because I am calculating some rolling means using rollapply on a data.table with 87 million records, they belong to some 1.7 million groups (a group scan be present from 1 to 36 times) - and it is taking a long time - just trying to learn something new – user1617979 May 08 '15 at 14:37
-
2@user1617979 Please type `?Rprof` at your R prompt and give it a try. – Matt Dowle May 08 '15 at 23:10
-
link here `multicore posts on datatable-help` is broken @MattDowle – abhiieor May 25 '18 at 12:49
-
@abhiieor What does your comment mean? What link? – Matt Dowle May 25 '18 at 16:55
-
1Just a quick note here... Doing parallel with data.table doesn't necessarily require copying memory. As such I've had some pretty good results with split-apply-combine. The trick is to take advantage of Unix process Forking. So it becomes Fork-Apply-Combine. I am usually doing this to optimize for the compute intensity of the Apply operation. Let me know if you want me to write up an example below. – Chris J.T. Auld Nov 15 '18 at 07:32
-
1MattDowle - I expect that @abhiieor meant the link in your post above, which now points to a broke nabble site. – malcook Mar 09 '19 at 00:19
I've done some tests per @matt dowle's prior mantra of Rprof, Rprof, Rprof.
What I find is that the decision to parallelize is context dependent; but is likely significant. Depending on the test operations (eg foo
below, which can be customized) and the number of cores utilized (I try both 8 and 24), I get different results.
Below results:
- using 8 cores, I see a 21% improvement in this example for parallelization
- using 24 cores, I see 14% improvement.
I also look at some real-world (non shareable) data / operations which shows a larger (33%
or 25%
, two different tests) improvement paralellizing with 24 cores. Edit May 2018 A new set of real-world example cases are showing closer to 85% improvements from parallel operations with 1000 groups.
R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods
[8] base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2 data.table_1.10.4
R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5 data.table_1.10.4
Example below:
library(data.table)
library(stringi)
library(microbenchmark)
set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)
my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)
foo <- function(dt) {
dt2 <- dt ## needed for .SD lock
nr <- nrow(dt2)
idx <- sample.int(nr, 1, replace=FALSE)
dt2[idx,][, `:=` (
new_var1= V1 / V2,
new_var2= V4 * V3 / V10,
new_var3= sum(V12),
new_var4= ifelse(V10 > 0, V11 / V13, 1),
new_var5= ifelse(V9 < 0, V8 / V18, 1)
)]
return(dt2[idx,])
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
foo2 <- function(dt) {
dt2 <- split_df(dt, "grps")
require(parallel)
cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
clusterExport(cl, varlist= "foo")
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
print(parallel::detectCores()) # 8
microbenchmark(
serial= dt[,foo(.SD), by= "grps"],
parallel= foo2(dt),
times= 10L
)
Unit: seconds
expr min lq mean median uq max neval cld
serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387 10 b
parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257 10 a
print(parallel::detectCores()) # 24
Unit: seconds
expr min lq mean median uq max neval cld
serial 9.014247 9.804112 12.17843 13.17508 13.56914 14.13133 10 a
parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353 10 a
Profiling:
We can use this answer to provide a more direct response to @matt dowle's original comment to profiling.
As a result, we do see that the majority of compute time is handled by base
and not data.table
. data.table
operations themselves are, as expected, exceptionally fast. While some might argue that this is evidence that there is no need for parallelism within data.table
, I posit that this workflow/operation-set is not atypical. That is, it is my strong suspicion that the majority of large data.table
aggregation involve a substantial amount of non-data.table
code; and that this is correlated with interactive use vs development / production use. I therefore conclude that parallelism would be valuable within data.table
for large aggregations.
library(profr)
prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
simplify = FALSE)
pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}
sort(sapply(fun_timing, sum)) # no large outliers
fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
ret <- data.table(fun= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
ret <- data.table(pkg= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
fun_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "fun"][
order(avg_time, decreasing = TRUE),][1:10,]
pkg_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "pkg"][
order(avg_time, decreasing = TRUE),]
Results:
fun total_time avg_time avg_pct
1: base::[ 670.362 6.70362 0.2694
2: NA::[.data.table 667.350 6.67350 0.2682
3: .GlobalEnv::foo 335.784 3.35784 0.1349
4: base::[[ 163.044 1.63044 0.0655
5: base::[[.data.frame 133.790 1.33790 0.0537
6: base::%in% 120.512 1.20512 0.0484
7: base::sys.call 86.846 0.86846 0.0348
8: NA::replace_dot_alias 27.824 0.27824 0.0112
9: base::which 23.536 0.23536 0.0095
10: base::sapply 22.080 0.22080 0.0089
pkg total_time avg_time avg_pct
1: base 1397.770 13.97770 0.7938
2: .GlobalEnv 335.784 3.35784 0.1908
3: data.table 27.262 0.27262 0.0155
crossposted in github/data.table

- 4,839
- 5
- 32
- 59
Yes (though, it may not be worth it, as well pointed out by @Alex W).
The following provides a simple pattern to do so. For simplicity of exposition I use an example in which it is not worth it (using the mean
function), but it shows of the pattern.
Example:
Suppose you want to compute the mean Petal.Length by Species in the iris data-set.
You could do it pretty directly using data.table as:
as.data.table(iris)[by=Species,,.(MPL=mean(Petal.Length))]
Species MPL
1: setosa 1.462
2: versicolor 4.260
3: virginica 5.552
But, if mean
was instead a sufficiently long-running and expensive computation (perhaps as determined by profiling though sometimes it is just "obvious"), you may like to use parallel::mclapply
. Since minimizing the communication with all the sub-processes mclapply spawns can greatly reduce overall computation, instead of passing selections from the data.table to each sub-process, you want to pass just the indices of the selection. Further, by sorting the data.table first, you can pass just the range (max and min) of these indices. Like this:
> o.dt<-as.data.table(iris)[order(Species)] # note: iris happens already to be ordered
> i.dt<-o.dt[,by=Species,.(irange=.(range(.I)))]
> i.dt
Species irange
1: setosa 1,50
2: versicolor 51,100
3: virginica 101,150
> result<-mclapply(seq(nrow(i.dt)),function(r) o.dt[do.call(seq,as.list(i.dt[r,irange][[1]])),.(MPL=mean(Petal.Length))])
> result
[[1]]
MPL
1: 1.462
[[2]]
MPL
1: 4.26
[[3]]
MPL
1: 5.552
> result.dt<-cbind(i.dt,rbindlist(result))[,-2]
> result.dt
Species MPL
1: setosa 1.462
2: versicolor 4.260
3: virginica 5.552
Reviewing the pattern:
- Order the input.
- Compute the index range for each group.
- Define an anonymous
function
to extract the rows comprising the group members, and perform the required computation (in this case, mean). - Apply the function to each group using mclapply on the row indices of the index ranges.
- Use
rbindlist
to get the results as a data.table,cbind
it to the input, and drop it index columns (unless you need to keep them around for some other reason).
Notes:
- The final
rbindlist
is generally expensive and may be skipped depending upon your application).
ToDo:
- convince data.table team that this pattern is sufficiently general and useful enough that additional data.table indexing options should invoke it. Imagine, passing mc=TRUE would invoke this pattern, and support additional parallel options in ...
iris.dt[by=Species,,.(MPL=mean(Petal.Length)), mc=TRUE, mc.preschedule=FALSE, mc.set.seed=TRUE,...]

- 1,686
- 16
- 16