18

Is there a way to get the points on an ROC curve from Spark ML in pyspark? In the documentation I see an example for Scala but not python: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html

Is that right? I can certainly think of ways to implement it but I have to imagine it’s faster if there’s a pre-built function. I’m working with 3 million scores and a few dozen models so speed matters.

desertnaut
  • 57,590
  • 26
  • 140
  • 166
seth127
  • 2,594
  • 5
  • 30
  • 43

3 Answers3

21

For a more general solution that works for models besides Logistic Regression (like Decision Trees or Random Forest which lack a model summary) you can get the ROC curve using BinaryClassificationMetrics from Spark MLlib.

Note that the PySpark version doesn't implement all of the methods that the Scala version does, so you'll need to use the .call(name) function from JavaModelWrapper. It also seems that py4j doesn't support parsing scala.Tuple2 classes, so they have to be manually processed.

Example:

from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Scala version implements .roc() and .pr()
# Python: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/common.html
# Scala: https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
class CurveMetrics(BinaryClassificationMetrics):
    def __init__(self, *args):
        super(CurveMetrics, self).__init__(*args)

    def _to_list(self, rdd):
        points = []
        # Note this collect could be inefficient for large datasets 
        # considering there may be one probability per datapoint (at most)
        # The Scala version takes a numBins parameter, 
        # but it doesn't seem possible to pass this from Python to Java
        for row in rdd.collect():
            # Results are returned as type scala.Tuple2, 
            # which doesn't appear to have a py4j mapping
            points += [(float(row._1()), float(row._2()))]
        return points

    def get_curve(self, method):
        rdd = getattr(self._java_model, method)().toJavaRDD()
        return self._to_list(rdd)

Usage:

import matplotlib.pyplot as plt

# Create a Pipeline estimator and fit on train DF, predict on test DF
model = estimator.fit(train)
predictions = model.transform(test)

# Returns as a list (false positive rate, true positive rate)
preds = predictions.select('label','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))
points = CurveMetrics(preds).get_curve('roc')

plt.figure()
x_val = [x[0] for x in points]
y_val = [x[1] for x in points]
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.plot(x_val, y_val)

ROC curve generated with PySpark BinaryClassificationMetrics

BinaryClassificationMetrics in Scala implements several other useful methods as well:

metrics = CurveMetrics(preds)
metrics.get_curve('fMeasureByThreshold')
metrics.get_curve('precisionByThreshold')
metrics.get_curve('recallByThreshold')
Alex Ross
  • 3,729
  • 3
  • 26
  • 26
  • 1
    this code gives an error NameError: name 'points' is not defined. Any idea why? – Gun Jan 09 '20 at 10:20
  • You need to replace points, with "roc", as you are creating the list here: `roc = CurveMetrics(preds).get_curve('roc')` and also replace the variable names in the plotting function. – SummerEla Jan 21 '20 at 01:39
  • 2
    Thanks @AlexRoss this really helped ! I wish i could up it multiple times ! – frno Mar 11 '20 at 16:46
  • Is there really no way to convert scala.Tuple2s without collecting the whole RDD? This has been driving me crazy all day – Nick Resnick Jun 09 '20 at 19:47
  • I spent a day looking into it as well back around the time of this answer, and didn't find a way. But I might have missed something or the API might have changed. Agreed not ideal for large datasets. – Alex Ross Jun 09 '20 at 22:42
  • I tried this solution on Spark 3.1.2 and Azure Databricks however get exception `Exception ignored in: Traceback (most recent call last): File "/databricks/spark/python/pyspark/mllib/common.py", line 137, in __del__ self._sc._gateway.detach(self._java_model) AttributeError: 'CurveMetrics' object has no attribute '_sc'`. Anyone knows why? – johnnyasd12 Oct 22 '21 at 08:46
12

As long as the ROC curve is a plot of FPR against TPR, you can extract the needed values as following:

your_model.summary.roc.select('FPR').collect()
your_model.summary.roc.select('TPR').collect())

Where your_model could be for example a model you got from something like this:

from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression()
your_model = log_reg.fit(df)

Now you should just plot FPR against TPR, using for example matplotlib.

P.S.

Here is a complete example for plotting ROC curve using a model named your_model (and anything else!). I've also plot a reference "random guess" line inside the ROC plot.

import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(your_model.summary.roc.select('FPR').collect(),
         your_model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()
Andrea
  • 4,262
  • 4
  • 37
  • 56
  • 3
    Thank you, this is helpful. In my case though, I don't have the actual model. I have a two column rdd with probabilities and a binary label. In the scala docs, you can do `metrics = BinaryClassificationMetrics(predictionAndLabels)` and then `metrics.roc` gives you the points. But that doesn't work for pyspark. I guess this is a hail mary that the function exists somewhere else? – seth127 Oct 17 '18 at 14:49
  • Also, I am able to get _some_ of the model objects and reload them, with `from pyspark.ml.classification import LogisticRegressionModel` and then `mdl = LogisticRegressionModel.load(loc)` but when I try your call I get `RuntimeError: No training summary available for this LogisticRegressionModel`. Any thoughts on that? – seth127 Oct 17 '18 at 14:50
  • 1
    Did you "fit" the model before loading it? The summary should be there as soon as you have trained the model. If you run `mdl.hasSummary` what happens? I have another guess regarding pipelines: if you put that model as the last stage of a pipeline, you can access it with `mdl.stages[-1].summary` instead of `mdl.summary`. – Andrea Oct 17 '18 at 15:28
  • it was a model trained in another program and then saved to s3 with `mdl.write().save(s3_path)`. I guess somehow it loses the summary. I guess I just need to go back to the other program and pull out the ROC points before I save the model, and then save them as json or whatever. That's not ideal, but I guess it's the best option. Thanks for the help. – seth127 Oct 18 '18 at 13:42
  • I understand. Usually I don't use save/load, so I cannot be helpful for that aspect. I'm glad I helped you! – Andrea Oct 18 '18 at 14:08
  • BTW This is ROC on model trained on train data, not test data ROC. – qwr Nov 02 '21 at 02:31
4

To get ROC metrics for train data (trained model), we can use your_model.summary.roc which is a DataFrame with columns FPR and TPR. See Andrea's answer.

For ROC evaluated on arbitrary test data, we can use label and probability columns to pass to sklearn's roc_curve to get FPR and TPR. Here we assume a binary classification problem where the y score is the probability of predicting 1. See also How to split Vector into columns - using PySpark, How to convert a pyspark dataframe column to numpy array

Example

from sklearn.metrics import roc_curve

model = lr.fit(train_df)
test_df_predict = model.transform(test_df)

y_score = test_df_predict.select(vector_to_array("probability")[1]).rdd.keys().collect()
y_true = test_df_predict.select("label").rdd.keys().collect()
fpr, tpr, thresholds = roc_curve(y_true, y_score)
qwr
  • 9,525
  • 5
  • 58
  • 102