So I am building a recommender system in Spark. While I have been able to evaluate and run the algorithm on the dataset with initial hand put hyper-parameter values. I want to automate it by letting the cross-validation estimator choose from a grid of hyper parameter values. So I wrote the following function for the same
def recommendation(train):
""" This function trains a collaborative filtering
algorithm on a ratings training data
We use a Cross Validator and Grid Search to find the right hyper-parameter values
Param:
train----> training data
TUNING PARAMETERS:
alpha----> Alpha value to calculate the confidence matrix (only for implicit datasets)
rank-----> no. of latent factors of the resulting X, Y matrix
reg------> regularization parameter for penalising the X, Y factors
Returns:
model-> ALS model object
"""
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.recommendation import ALS
alsImplicit = ALS(implicitPrefs=True)
#model=als.fit(train)
paramMapImplicit = ParamGridBuilder() \
.addGrid(alsImplicit.rank, [20, 120]) \
.addGrid(alsImplicit.maxIter, [10, 15]) \
.addGrid(alsImplicit.regParam, [0.01, 1.0]) \
.addGrid(alsImplicit.alpha, [10.0, 40.0]) \
.build()
evaluator=BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="rating",metricName="areaUnderROC")
# Build the recommendation model using ALS on the training data
#als = ALS(rank=120, maxIter=15, regParam=0.01, implicitPrefs=True)
#model = als.fit(train)
cvEstimator= CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluator)
cvModel=cvEstimator.fit(train)
return cvModel,evaluator
The problem is when I call this function this would give the following error:
Running the ALS function to train the data
model,evaluator=recommendation(train)
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<ipython-input-21-ea5de889f984> in <module>()
1 # Running the ALS function to train the data
2
----> 3 model,evaluator=recommendation(train)
<ipython-input-15-0fb855b138b1> in recommendation(train)
138 cvEstimator= CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluator)
139
--> 140 cvModel=cvEstimator.fit(train)
141
142 return cvModel,evaluator
/Users/i854319/spark/python/pyspark/ml/pipeline.pyc in fit(self, dataset, params)
67 return self.copy(params)._fit(dataset)
68 else:
---> 69 return self._fit(dataset)
70 else:
71 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/Users/i854319/spark/python/pyspark/ml/tuning.pyc in _fit(self, dataset)
239 model = est.fit(train, epm[j])
240 # TODO: duplicate evaluator to take extra params from input
--> 241 metric = eva.evaluate(model.transform(validation, epm[j]))
242 metrics[j] += metric
243
/Users/i854319/spark/python/pyspark/ml/evaluation.pyc in evaluate(self, dataset, params)
67 return self.copy(params)._evaluate(dataset)
68 else:
---> 69 return self._evaluate(dataset)
70 else:
71 raise ValueError("Params must be a param map but got %s." % type(params))
/Users/i854319/spark/python/pyspark/ml/evaluation.pyc in _evaluate(self, dataset)
97 """
98 self._transfer_params_to_java()
---> 99 return self._java_obj.evaluate(dataset._jdf)
100
101 def isLargerBetter(self):
/Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/Users/i854319/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
51 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
52 if s.startswith('java.lang.IllegalArgumentException: '):
---> 53 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
54 raise
55 return deco
IllegalArgumentException: u'requirement failed: Column prediction must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually FloatType.'
This is expected since the BinaryClassificationEvaluator method expects the Vector of probabilities for prediction values. While cvmodel.bestModel.tranform(data) gives a float values for prediction.
Now while I am able to convert those to DenseVector format in a separate method when I was manually testing the hyper parameter values as below
def calcEval(testDF,predictions,evaluator):
""" This function checks the evaluation metric for the recommendation algorithm
testDF-> Validation or Test data to check the evalutation metric on
"""
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import VectorUDT, DenseVector
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#predictions=model.transform(testDF)
#print "Total Count of the predictions data is {}".format(predictions.count())
## Converting the Data Type of the Rating and Prediction column
as_prob = udf(lambda x: DenseVector([1-x,x]), VectorUDT())
predictions=predictions.withColumn("prediction", as_prob(predictions["prediction"]))
# Converting the Rating column to DoubleType()
#predictions=predictions.withColumn("rating", predictions["rating"].cast(DoubleType()))
predictions.show(5)
# Calculating the AUC
print evaluator.getMetricName(), "The AUC of the Model is {}".format(evaluator.evaluate(predictions))
print "The AUC under PR curve is {}".format(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"}))
But while using cross validator estimator since the algo chooses the right hyperparamter by testing it on the cross validated dataset within the main cross validator class, I am not sure how to change the data type of the predicted probabilities when the crossValidator Estimator is running.
Can someone guide here?