1

I want to have each python worker start an R shell using rpy2. Can I do this during some sort of setup phase similar to how I assume this would happen when you import a Python module to be used for later executor tasks? For example:

import numpy as np

df.mapPartitions(lambda x: np.zeros(x))

In my case I want to instead start an R shell on each executor and import R libraries, which would look something like this:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')

df.mapPartitions(lambda x: rlibrary.rfunc(x))

But I don't want this to occur inside the call to mapPartitions, because then it would happen at the task-level as opposed to once per executor core. That approach works and looks more like the example below but is not useful for me.

def model(partition):
    import rpy2.robjects as robjects
    from  rpy2.robjects.packages import importr
    rlibrary = importr('testrlibrary')
    rlibrary.rfunc(partition)

df.mapPartitions(model)
zero323
  • 322,348
  • 103
  • 959
  • 935
retrocookie
  • 319
  • 2
  • 10

1 Answers1

1

Something like this should work just fine:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr

def length_(s):
    stringi = importr("stringi")  
    return stringi.stri_length(s)[0]

sc.parallelize(["foo", "bar", "foobar"]).map(length_)

R object, which represents R interpreter, is a singleton so it will be initialized only once and R doesn't re-import already attached libraries. There is some overhead from calling require multiple times but it should be negligible compared to the cost of passing your data to and from R.

If you want something more sophisticated you can create your own singleton module or use Borg pattern to handle imports but it could be an overkill.

I assume this would happen when you import a python module to be used for later executor tasks

It actually depends on a configuration. By default Spark reuses interpreters between tasks but this behavior can be modified.

I've provided some examples as an answer to In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map. Maybe you'll find these useful.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • See the related question here: [How can I partition pyspark RDDs holding R functions](http://stackoverflow.com/questions/34669751/) – retrocookie Jan 08 '16 at 04:54