How to print the decision path of a specific sample in a Spark DataFrame?
Spark Version: '2.3.1'
The below code prints the decision path of the whole model, how to make it print a decision path of a specific sample? For example, the decision path of the row where tagvalue ball equals 2
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id, col, row_number
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('demo')\
.master('local[*]')\
.getOrCreate()
data = pd.DataFrame({
'ball': [0, 1, 2, 3],
'keep': [4, 5, 6, 7],
'hall': [8, 9, 10, 11],
'fall': [12, 13, 14, 15],
'mall': [16, 17, 18, 10],
'label': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)
df = df.withColumn("mono_ID", monotonically_increasing_id())
w = Window().orderBy("mono_ID")
df = df.select(row_number().over(w).alias("tagvalue"), col("*"))
assembler = VectorAssembler(
inputCols=['ball', 'keep', 'hall', 'fall'], outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[assembler, dtc]).fit(df)
transformed_pipeline = pipeline.transform(df)
#ml_pipeline = pipeline.stages[1]
result = transformed_pipeline.filter(transformed_pipeline.tagvalue == 2)
result.select('tagvalue', 'prediction').show()
+--------+----------+
|tagvalue|prediction|
+--------+----------+
| 2| 31.0|
+--------+----------+
The above prints the prediction
of tagvalue 2
. Now I would like the decision path in the algorithm that led to that answer of that tag value rather than the whole model.
I am aware of the following but that prints the whole model decision path rather than a specific model.
ml_pipeline = pipeline.stages[1]
ml_pipeline.toDebugString
The equivalent of that exists in scikit learn, what is the equivalence in spark ?
Update 1:
If you would run the following code in scikit learn, it will print the decision path for that specific sample, here is a snippet straight out of the website.
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
estimator = DecisionTreeClassifier(max_leaf_nodes=3, random_state=0)
estimator.fit(X_train, y_train)
n_nodes = estimator.tree_.node_count
children_left = estimator.tree_.children_left
children_right = estimator.tree_.children_right
feature = estimator.tree_.feature
threshold = estimator.tree_.threshold
# First let's retrieve the decision path of each sample. The decision_path
# method allows to retrieve the node indicator functions. A non zero element of
# indicator matrix at the position (i, j) indicates that the sample i goes
# through the node j.
node_indicator = estimator.decision_path(X_test)
# Similarly, we can also have the leaves ids reached by each sample.
leave_id = estimator.apply(X_test)
# Now, it's possible to get the tests that were used to predict a sample or
# a group of samples. First, let's make it for the sample.
sample_id = 0
node_index = node_indicator.indices[node_indicator.indptr[sample_id]:
node_indicator.indptr[sample_id + 1]]
print('Rules used to predict sample %s: ' % sample_id)
for node_id in node_index:
if leave_id[sample_id] != node_id:
continue
if (X_test[sample_id, feature[node_id]] <= threshold[node_id]):
threshold_sign = "<="
else:
threshold_sign = ">"
print("decision id node %s : (X_test[%s, %s] (= %s) %s %s)" %
(node_id,
sample_id,
feature[node_id],
X_test[sample_id, feature[node_id]],
threshold_sign,
threshold[node_id]))
THe output will be like this
Rules used to predict sample 0: decision id node 4 : (X_test[0, -2] (= 5.1) > -2.0)