I couldn't get @dmbaker's ingenious solution to work using Python 2 on Spark 2.2.0; I kept getting pickling errors. After several blind alleys I got a working solution by modifying his (her?) idea to write and read the parameter values as strings into StopWordsRemover's
stop words directly.
Here's the base class you need if you want to save and load your own estimators or transformers:
from pyspark import SparkContext
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.util import Identifiable, MLWritable, JavaMLWriter, MLReadable, JavaMLReader
from pyspark.ml.wrapper import JavaWrapper, JavaParams
class PysparkReaderWriter(Identifiable, MLReadable, MLWritable):
"""
A base class for custom pyspark Estimators and Models to support saving and loading directly
or within a Pipeline or PipelineModel.
"""
def __init__(self):
super(PysparkReaderWriter, self).__init__()
@staticmethod
def _getPyObjIdPrefix():
return "_ThisIsReallyA_"
@classmethod
def _getPyObjId(cls):
return PysparkReaderWriter._getPyObjIdPrefix() + cls.__name__
def getParamsAsListOfStrings(self):
raise NotImplementedError("PysparkReaderWriter.getParamsAsListOfStrings() not implemented for instance: %r" % self)
def write(self):
"""Returns an MLWriter instance for this ML instance."""
return JavaMLWriter(self)
def _to_java(self):
# Convert all our parameters to strings:
paramValuesAsStrings = self.getParamsAsListOfStrings()
# Append our own type-specific id so PysparkPipelineLoader can detect this algorithm when unwrapping us.
paramValuesAsStrings.append(self._getPyObjId())
# Convert the parameter values to a Java array:
sc = SparkContext._active_spark_context
java_array = JavaWrapper._new_java_array(paramValuesAsStrings, sc._gateway.jvm.java.lang.String)
# Create a Java (Scala) StopWordsRemover and give it the parameters as its stop words.
_java_obj = JavaParams._new_java_obj("org.apache.spark.ml.feature.StopWordsRemover", self.uid)
_java_obj.setStopWords(java_array)
return _java_obj
@classmethod
def _from_java(cls, java_obj):
# Get the stop words, ignoring the id at the end:
stopWords = java_obj.getStopWords()[:-1]
return cls.createAndInitialisePyObj(stopWords)
@classmethod
def createAndInitialisePyObj(cls, paramsAsListOfStrings):
raise NotImplementedError("PysparkReaderWriter.createAndInitialisePyObj() not implemented for type: %r" % cls)
@classmethod
def read(cls):
"""Returns an MLReader instance for our clarrier class."""
return JavaMLReader(StopWordsRemover)
@classmethod
def load(cls, path):
"""Reads an ML instance from the input path, a shortcut of `read().load(path)`."""
swr_java_obj = cls.read().load(path)
return cls._from_java(swr_java_obj)
Your own pyspark algorithm must then inherit from PysparkReaderWriter
and override the getParamsAsListOfStrings()
method which saves your parameters to a list of strings. Your algorithm must also override the createAndInitialisePyObj()
method for converting a list of strings back into your parameters. Behind the scenes the parameters are converted to and from the stop words used by StopWordsRemover
.
Example estimator with 3 parameters of different type:
from pyspark.ml.param.shared import Param, Params, TypeConverters
from pyspark.ml.base import Estimator
class MyEstimator(Estimator, PysparkReaderWriter):
def __init__(self):
super(MyEstimator, self).__init__()
# 3 sample parameters, deliberately of different types:
stringParam = Param(Params._dummy(), "stringParam", "A dummy string parameter", typeConverter=TypeConverters.toString)
def setStringParam(self, value):
return self._set(stringParam=value)
def getStringParam(self):
return self.getOrDefault(self.stringParam)
listOfStringsParam = Param(Params._dummy(), "listOfStringsParam", "A dummy list of strings.", typeConverter=TypeConverters.toListString)
def setListOfStringsParam(self, value):
return self._set(listOfStringsParam=value)
def getListOfStringsParam(self):
return self.getOrDefault(self.listOfStringsParam)
intParam = Param(Params._dummy(), "intParam", "A dummy int parameter.", typeConverter=TypeConverters.toInt)
def setIntParam(self, value):
return self._set(intParam=value)
def getIntParam(self):
return self.getOrDefault(self.intParam)
def _fit(self, dataset):
model = MyModel()
# Just some changes to verify we can modify the model (and also it's something we can expect to see when restoring it later):
model.setAnotherStringParam(self.getStringParam() + " World!")
model.setAnotherListOfStringsParam(self.getListOfStringsParam() + ["E", "F"])
model.setAnotherIntParam(self.getIntParam() + 10)
return model
def getParamsAsListOfStrings(self):
paramValuesAsStrings = []
paramValuesAsStrings.append(self.getStringParam()) # Parameter is already a string
paramValuesAsStrings.append(','.join(self.getListOfStringsParam())) # ...convert from a list of strings
paramValuesAsStrings.append(str(self.getIntParam())) # ...convert from an int
return paramValuesAsStrings
@classmethod
def createAndInitialisePyObj(cls, paramsAsListOfStrings):
# Convert back into our parameters. Make sure you do this in the same order you saved them!
py_obj = cls()
py_obj.setStringParam(paramsAsListOfStrings[0])
py_obj.setListOfStringsParam(paramsAsListOfStrings[1].split(","))
py_obj.setIntParam(int(paramsAsListOfStrings[2]))
return py_obj
Example Model (also a Transformer) which has 3 different parameters:
from pyspark.ml.base import Model
class MyModel(Model, PysparkReaderWriter):
def __init__(self):
super(MyModel, self).__init__()
# 3 sample parameters, deliberately of different types:
anotherStringParam = Param(Params._dummy(), "anotherStringParam", "A dummy string parameter", typeConverter=TypeConverters.toString)
def setAnotherStringParam(self, value):
return self._set(anotherStringParam=value)
def getAnotherStringParam(self):
return self.getOrDefault(self.anotherStringParam)
anotherListOfStringsParam = Param(Params._dummy(), "anotherListOfStringsParam", "A dummy list of strings.", typeConverter=TypeConverters.toListString)
def setAnotherListOfStringsParam(self, value):
return self._set(anotherListOfStringsParam=value)
def getAnotherListOfStringsParam(self):
return self.getOrDefault(self.anotherListOfStringsParam)
anotherIntParam = Param(Params._dummy(), "anotherIntParam", "A dummy int parameter.", typeConverter=TypeConverters.toInt)
def setAnotherIntParam(self, value):
return self._set(anotherIntParam=value)
def getAnotherIntParam(self):
return self.getOrDefault(self.anotherIntParam)
def _transform(self, dataset):
# Dummy transform code:
return dataset.withColumn('age2', dataset.age + self.getAnotherIntParam())
def getParamsAsListOfStrings(self):
paramValuesAsStrings = []
paramValuesAsStrings.append(self.getAnotherStringParam()) # Parameter is already a string
paramValuesAsStrings.append(','.join(self.getAnotherListOfStringsParam())) # ...convert from a list of strings
paramValuesAsStrings.append(str(self.getAnotherIntParam())) # ...convert from an int
return paramValuesAsStrings
@classmethod
def createAndInitialisePyObj(cls, paramsAsListOfStrings):
# Convert back into our parameters. Make sure you do this in the same order you saved them!
py_obj = cls()
py_obj.setAnotherStringParam(paramsAsListOfStrings[0])
py_obj.setAnotherListOfStringsParam(paramsAsListOfStrings[1].split(","))
py_obj.setAnotherIntParam(int(paramsAsListOfStrings[2]))
return py_obj
Below is a sample test case showing how you can save and load your model. It's similar for the estimator so I omit that for brevity.
def createAModel():
m = MyModel()
m.setAnotherStringParam("Boo!")
m.setAnotherListOfStringsParam(["P", "Q", "R"])
m.setAnotherIntParam(77)
return m
def testSaveLoadModel():
modA = createAModel()
print(modA.explainParams())
savePath = "/whatever/path/you/want"
#modA.save(savePath) # Can't overwrite, so...
modA.write().overwrite().save(savePath)
modB = MyModel.load(savePath)
print(modB.explainParams())
testSaveLoadModel()
Output:
anotherIntParam: A dummy int parameter. (current: 77)
anotherListOfStringsParam: A dummy list of strings. (current: ['P', 'Q', 'R'])
anotherStringParam: A dummy string parameter (current: Boo!)
anotherIntParam: A dummy int parameter. (current: 77)
anotherListOfStringsParam: A dummy list of strings. (current: [u'P', u'Q', u'R'])
anotherStringParam: A dummy string parameter (current: Boo!)
Notice how the parameters have come back in as unicode strings. This may or may not make a difference to your underlying algorithm that you implement in _transform()
(or _fit()
for the estimator). So be aware of this.
Finally, because the Scala algorithm behind the scenes is really a StopWordsRemover
, you need to unwrap it back into your own class when loading the Pipeline
or PipelineModel
from disk. Here's the utility class that does this unwrapping:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StopWordsRemover
class PysparkPipelineLoader(object):
"""
A class to facilitate converting the stages of a Pipeline or PipelineModel
that were saved from PysparkReaderWriter.
"""
def __init__(self):
super(PysparkPipelineLoader, self).__init__()
@staticmethod
def unwrap(thingToUnwrap, customClassList):
if not (isinstance(thingToUnwrap, Pipeline) or isinstance(thingToUnwrap, PipelineModel)):
raise TypeError("Cannot recognize an object of type %s." % type(thingToUnwrap))
stages = thingToUnwrap.getStages() if isinstance(thingToUnwrap, Pipeline) else thingToUnwrap.stages
for i, stage in enumerate(stages):
if (isinstance(stage, Pipeline) or isinstance(stage, PipelineModel)):
stages[i] = PysparkPipelineLoader.unwrap(stage)
if isinstance(stage, StopWordsRemover) and stage.getStopWords()[-1].startswith(PysparkReaderWriter._getPyObjIdPrefix()):
lastWord = stage.getStopWords()[-1]
className = lastWord[len(PysparkReaderWriter._getPyObjIdPrefix()):]
stopWords = stage.getStopWords()[:-1] # Strip the id
# Create and initialise the appropriate class:
py_obj = None
for clazz in customClassList:
if clazz.__name__ == className:
py_obj = clazz.createAndInitialisePyObj(stopWords)
if py_obj is None:
raise TypeError("I don't know how to create an instance of type: %s" % className)
stages[i] = py_obj
if isinstance(thingToUnwrap, Pipeline):
thingToUnwrap.setStages(stages)
else:
# PipelineModel
thingToUnwrap.stages = stages
return thingToUnwrap
Test for saving and loading a pipeline:
def testSaveAndLoadUnfittedPipeline():
estA = createAnEstimator()
#print(estA.explainParams())
pipelineA = Pipeline(stages=[estA])
savePath = "/whatever/path/you/want"
#pipelineA.save(savePath) # Can't overwrite, so...
pipelineA.write().overwrite().save(savePath)
pipelineReloaded = PysparkPipelineLoader.unwrap(Pipeline.load(savePath), [MyEstimator])
estB = pipelineReloaded.getStages()[0]
print(estB.explainParams())
testSaveAndLoadUnfittedPipeline()
Output:
intParam: A dummy int parameter. (current: 42)
listOfStringsParam: A dummy list of strings. (current: [u'A', u'B', u'C', u'D'])
stringParam: A dummy string parameter (current: Hello)
Test for saving and loading a pipeline model:
from pyspark.sql import Row
def make_a_dataframe(sc):
df = sc.parallelize([Row(name='Alice', age=5, height=80), Row(name='Bob', age=7, height=85), Row(name='Chris', age=10, height=90)]).toDF()
return df
def testSaveAndLoadPipelineModel():
dfA = make_a_dataframe(sc)
estA = createAnEstimator()
#print(estA.explainParams())
pipelineModelA = Pipeline(stages=[estA]).fit(dfA)
savePath = "/whatever/path/you/want"
#pipelineModelA.save(savePath) # Can't overwrite, so...
pipelineModelA.write().overwrite().save(savePath)
pipelineModelReloaded = PysparkPipelineLoader.unwrap(PipelineModel.load(savePath), [MyModel])
modB = pipelineModelReloaded.stages[0]
print(modB.explainParams())
dfB = pipelineModelReloaded.transform(dfA)
dfB.show()
testSaveAndLoadPipelineModel()
Output:
anotherIntParam: A dummy int parameter. (current: 52)
anotherListOfStringsParam: A dummy list of strings. (current: [u'A', u'B', u'C', u'D', u'E', u'F'])
anotherStringParam: A dummy string parameter (current: Hello World!)
+---+------+-----+----+
|age|height| name|age2|
+---+------+-----+----+
| 5| 80|Alice| 57|
| 7| 85| Bob| 59|
| 10| 90|Chris| 62|
+---+------+-----+----+
When unwrapping a pipeline or pipeline model you have to pass in a list of the classes that correspond to your own pyspark algorithms that are masquerading as StopWordsRemover
objects in the saved pipeline or pipeline model. The last stop word in your saved object is used to identify your own class's name and then createAndInitialisePyObj()
is called to create an instance of your class and initialise its parameters with the remaining stop words.
Various refinements could be made. But hopefully this will enable you to save and load custom estimators and transformers, both inside and outside pipelines, until SPARK-17025 is resolved and available to you.