24

I am trying to build a simple custom Estimator in PySpark MLlib. I have here that it is possible to write a custom Transformer but I am not sure how to do it on an Estimator. I also don't understand what @keyword_only does and why do I need so many setters and getters. Scikit-learn seem to have a proper document for custom models (see here) but PySpark doesn't.

Pseudo code of an example model:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?
j-i-l
  • 10,281
  • 3
  • 53
  • 70
Hanan Shteingart
  • 8,480
  • 10
  • 53
  • 66

2 Answers2

27

Generally speaking there is no documentation because as for Spark 1.6 / 2.0 most of the related API is not intended to be public. It should change in Spark 2.1.0 (see SPARK-7146).

API is relatively complex because it has to follow specific conventions in order to make given Transformer or Estimator compatible with Pipeline API. Some of these methods may be required for features like reading and writing or grid search. Other, like keyword_only are just a simple helpers and not strictly required.

Assuming you have defined following mix-ins for mean parameter:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

standard deviation parameter:

class HasStandardDeviation(Params):

    standardDeviation = Param(Params._dummy(),
        "standardDeviation", "standardDeviation", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(standardDeviation=value)

    def getStddev(self):
        return self.getOrDefault(self.standardDeviation)

and threshold:

class HasCenteredThreshold(Params):

    centeredThreshold = Param(Params._dummy(),
            "centeredThreshold", "centeredThreshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centeredThreshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centeredThreshold)

you could create basic Estimator as follows:

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
from pyspark import keyword_only  

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        super(NormalDeviation, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setPredictionCol(self, value):
        """
        Sets the value of :py:attr:`predictionCol`.
        """
        return self._set(predictionCol=value)

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        
        
    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return NormalDeviationModel(
            inputCol=c, mean=mu, standardDeviation=sigma, 
            centeredThreshold=self.getCenteredThreshold(),
            predictionCol=self.getPredictionCol())


class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        super(NormalDeviationModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)  

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)           

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)    

Credits to Benjamin-Manns for the use of DefaultParamsReadable, DefaultParamsWritable available in PySpark >= 2.3.0

Finally it could be used as follows:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+
j-i-l
  • 10,281
  • 3
  • 53
  • 70
zero323
  • 322,348
  • 103
  • 959
  • 935
  • thanks! so a state of an Estimator is considered a parameter as well? – Hanan Shteingart May 18 '16 at 11:11
  • Do you mean tuned parameters of the estimator as a Param for Model? If so, it is convenient to design this way but it is not a hard requirement for a basic implementation. – zero323 May 24 '16 at 21:08
  • Ok, any hope to get some advice about how to persist custom steps like this? – Evan Zamir Aug 13 '16 at 00:02
  • 1
    This is a very useful example. But what if your transformer/model has parameters that are specific to it rather than the estimator? How do you pass such parameters to the model once it's a stage within a pipeline? I don't want to pass these parameters into the estimator first when they have nothing to do with the estimator. I've asked this question [here...](https://stackoverflow.com/questions/48642867/how-to-set-parameters-for-a-custom-pyspark-transformer-once-its-a-stage-in-a-fi) – snark Feb 07 '18 at 10:24
  • 1
    Thanks @zero323 - is there any update on this? I hate this syntax where you need to inherit from each Param (an Estimator is not a Param so it should not inherit from it...) – Hanan Shteingart Mar 05 '18 at 20:52
  • 1
    @HananShteingart Both `Estimator` and `Transform` as well as other constructs (like Evaluators) are Params (`issubclass(Estimator, Params)` - note plural not singular as Param is related but different entity), and any built-in Estimator is a subclass of Params. This is direct translation of Scala API which follows the same structure (see [known subclasses](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.param.Params)). This is directly tied to `MLRead[er|able]` / `MLWriter[er|able]` interfaces and design of Pipeline API (primarily setters and getters.) – 10465355 Dec 21 '18 at 20:22
0

I disagree with @Shteingarts Solution, as he creates members on class level and even mixes them with instance ones. Will lead to issues if you create several HasMean instances. Why not use the imho correct approach with instance variables? Same holds for the other code samples.

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):
    def __init__(self):
        super(HasMean, self).__init__()
        self.mean = Param(self, "mean", "mean", typeConverter=TypeConverters.toFloat)

    def setMean(self, value):
        return self.set(self.mean, value)

    def getMean(self):
        return self.getOrDefault(self.mean)
gilgamash
  • 862
  • 10
  • 31