1

Following up some data.table parallelism (1) (2) (3) I'm trying to figure it out. What's wrong with this syntax?

library(data.table)
set.seed(1234)
dt <- data.table(id= factor(sample(1L:10000L, size= 1e6, replace= TRUE)),
       val= rnorm(n= 1e6), key="id")

foo <- function(l) sum(l)

dt2 <- dt[, foo(.SD), by= "id"]

library(parallel)
cl <- makeCluster(detectCores())
dt3 <- clusterApply(cl, x= parallel:::splitRows(dt, detectCores()),
          fun=lapply, FUN= function(x,foo) {
            x[, foo(data.table:::".SD"), by= "id"]
          }, foo= foo)
stopCluster(cl)
# note that library(parallel) is annoying and you often have to do this type ("::", ":::") of exporting to the parallel package

Error in checkForRemoteErrors(val) : 4 nodes produced errors; first error: incorrect number of dimensions

cl <- makeCluster(detectCores())
dt3 <- clusterApply(cl, x= parallel:::splitRows(dt, detectCores()),
          fun=lapply, FUN= function(x,foo) {
            x <- data.table::data.table(x)
            x[, foo(data.table:::".SD"), by= "id"]
          }, foo= foo)
stopCluster(cl)

Error in checkForRemoteErrors(val) : 4 nodes produced errors; first error: object 'id' not found

I've played around with the syntax quite a bit. These two seem to be the closest I can get. And obviously something's still not right.

My real problem is similarly structured but has many more rows and I'm using a machine with 24 cores / 48 logical processors. So watching my computer use roughly 4% of it's computing power (by using only 1 core) is really annoying

Community
  • 1
  • 1
alexwhitworth
  • 4,839
  • 5
  • 32
  • 59

1 Answers1

3

You may want to evaluate Rserve solution for parallelism.

See below example build on Rserve using 2 R nodes locally in parallel. It can be distributed over remote instances also.

library(data.table)
set.seed(1234)
dt <- data.table(id= factor(sample(1L:10000L, size= 1e6, replace= TRUE)),
                 val= rnorm(n= 1e6), key="id")
foo <- function(l) sum(l)

library(big.data.table)
# start 2 R instances
library(Rserve)
port = 6311:6312
invisible(sapply(port, function(port) Rserve(debug = FALSE, port = port, args = c("--no-save"))))
# client side
rscl = rscl.connect(port = port, pkgs = "data.table") # connect and auto require packages
bdt = as.big.data.table(dt, rscl) # create big.data.table from local data.table and list of connections to R nodes
rscl.assign(rscl, "foo", foo) # assign `foo` function to nodes
bdt[, foo(.SD), by="id"][, foo(.SD), by="id"] # first query is run remotely, second locally
#          id         V1
#    1:     1  10.328998
#    2:     2  -8.448441
#    3:     3  21.475910
#    4:     4  -5.302411
#    5:     5 -11.929699
#   ---                 
# 9996:  9996  -4.905192
# 9997:  9997  -4.293194
# 9998:  9998  -2.387100
# 9999:  9999  16.530731
#10000: 10000 -15.390543

# optionally with special care
# bdt[, foo(.SD), by= "id", outer.aggregate = TRUE]

session info:

R version 3.2.3 (2015-12-10)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 14.04.4 LTS

locale:
 [1] LC_CTYPE=en_GB.UTF-8       LC_NUMERIC=C               LC_TIME=en_GB.UTF-8        LC_COLLATE=en_GB.UTF-8     LC_MONETARY=en_GB.UTF-8    LC_MESSAGES=en_GB.UTF-8    LC_PAPER=en_GB.UTF-8      
 [8] LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=en_GB.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] Rserve_1.8-5         big.data.table_0.3.3 data.table_1.9.7    

loaded via a namespace (and not attached):
[1] RSclient_0.7-3 tools_3.2.3 
jangorecki
  • 16,384
  • 4
  • 79
  • 160
  • Looks interesting--how does one properly specify ports? – alexwhitworth Mar 08 '16 at 16:59
  • @Alex You specify ports in `Rserve::Rserve()` call, then you use the same when connecting with `rscl.connect`. You could use single port and have multiple separately processed connections, but I prefer to isolate each R node on own port. – jangorecki Mar 08 '16 at 19:37
  • My question is related to the fact that `library(Rserve)` isn't particularly clearly documented. So, let's say I have 24 cores. Do I modify your code as `ports <- 6311:6324`? Something else? I tried that I got a major stall at `rscl.assign(...)` where it looked like there was a lot of ethernet activity but no CPU/RAM activity. Is that to be expected? – alexwhitworth Mar 08 '16 at 21:14
  • Regardless, thanks a lot for the help! I'm just not sure how to use this for my actual problem. – alexwhitworth Mar 08 '16 at 21:14
  • @Alex use `port = 6311` when starting Rserve, and `rep(6311, 24)` when passing port to `rscl.connect`. `rscl.assign` should not stuck when sending `foo` as functions are cheap to send. Don't expect speed up vs. single core data.table. It is mainly useful to overcome memory limit. – jangorecki Mar 08 '16 at 22:07
  • Now I'm getting a stall at `rscl.connect(...)` – alexwhitworth Mar 08 '16 at 22:25
  • @Alex if you are able to debug that Rserve is running and accepting connection and there is something wrong in handling that from `rscl.connect` you can report an issue in big.data.table repo. – jangorecki Mar 08 '16 at 22:43
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/105744/discussion-between-alex-and-jangorecki). – alexwhitworth Mar 08 '16 at 22:59