import rpy2.robjects as robjects
dffunc = sc.parallelize([(0,robjects.r.rnorm),(1,robjects.r.runif)])
dffunc.collect()
Outputs
[(0, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc28618 / R:0x26abd18>), (1, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc283d8 / R:0x26aad28>)]
While the partitioned version results in an error:
dffuncpart = dffunc.partitionBy(2)
dffuncpart.collect()
RuntimeError: ('R cannot evaluate code before being initialized.', <built-in function unserialize>
It seems like this error is that R
wasn't loaded on one of the partitions, which I assume implies that the first import step was not performed. Is there anyway around this?
EDIT 1 This second example causes me to think there's a bug in the timing of pyspark or rpy2.
dffunc = sc.parallelize([(0,robjects.r.rnorm), (1,robjects.r.runif)]).partitionBy(2)
def loadmodel(model):
import rpy2.robjects as robjects
return model[1](2)
dffunc.map(loadmodel).collect()
Produces the same error R cannot evaluate code before being initialized.
dffuncpickle = sc.parallelize([(0,pickle.dumps(robjects.r.rnorm)),(1,pickle.dumps(robjects.r.runif))]).partitionBy(2)
def loadmodelpickle(model):
import rpy2.robjects as robjects
import pickle
return pickle.loads(model[1])(2)
dffuncpickle.map(loadmodelpickle).collect()
Works just as expected.