3

(see working solution below)

I want to use multidplyr to parallelize a function :

calculs.R
f <- function(x){
return(x+1)
}

main.R
library(dplyr)
library(multidplyr)
source("calculs.R")
d <- data.frame(a=1:1000,b=sample(1:2,1000),replace=T)

result <- d %>% 
   partition(b) %>% 
     do(f(.)) %>%
     collect()  

I then get:

Initialising 3 core cluster.
Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
  2 nodes produced errors; first error: could not find function "f"
In addition: Warning message:
group_indices_.grouped_df ignores extra arguments 

How can I assign sourced functions to each core?

==================

Here is the flawless script:

Must extract the value to update, and turn the result into a dataframe

calcul.R
f <- function(x){
    return(data.frame(x$a+1))
    }

Must set the clusters and assign the sourced functions

main.R
 library(dplyr)
library(multidplyr)
source("calculs.R")

cl <- create_cluster(3)
set_default_cluster(cl)
cluster_copy(cl, f)

d <- data.frame(a=1:10,b=c(rep(1,5),rep(2,5)))

  result <- d %>%
   partition(b) %>%
     do(f(.)) %>%
     collect()
Xavier Prudent
  • 1,570
  • 3
  • 25
  • 54

1 Answers1

5

It looks like you initialized a cluster (though you don't show this part). You need to export variables/function from your global environment to each worker. Assuming you made your cluster as

cl <- create_cluster(3)
set_default_cluster(cl)

Can you try

cluster_copy(cl, f)    

This will copy-and-export f to each worker (I think...)

Extra

You'll likely run into another problem which is that your function accepts x as an argument, to which you add 1

f <- function(x){
         return(x+1)
}

Since you're passing a data frame to f, you are asking for data.frame+1, which doesn't make sense. You might want to change your function to something like

f <- function(x){
         return(x$a+1)
}
CPak
  • 13,260
  • 3
  • 30
  • 48
  • Thanks for this detailed answer CPak, more info for the next ones: http://blog.aicry.com/multidplyr-dplyr-meets-parallel-processing/ – Xavier Prudent Oct 04 '17 at 14:12
  • Please note that in the latest version of multidplyr (commit a1ad225) a function's name seems to have changed from `create_cluster()` to `new_cluster()`. As far as I know there are some recent deep changes in the backend of this library as well as in other functions' names. – Sorlac Oct 11 '19 at 14:49