5

So I am building a recommender system in Spark. While I have been able to evaluate and run the algorithm on the dataset with initial hand put hyper-parameter values. I want to automate it by letting the cross-validation estimator choose from a grid of hyper parameter values. So I wrote the following function for the same

def recommendation(train):
    """ This function trains a collaborative filtering 
    algorithm on a ratings training data

    We use a Cross Validator and Grid Search to find the right hyper-parameter values



    Param: 
    train----> training data

    TUNING PARAMETERS: 
    alpha----> Alpha value to calculate the confidence matrix (only for implicit datasets)
    rank-----> no. of latent factors of the resulting X, Y matrix
    reg------> regularization parameter for penalising the X, Y factors


    Returns: 
    model-> ALS model object

    """


    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.evaluation import BinaryClassificationEvaluator 

    from pyspark.ml.recommendation import ALS

    alsImplicit = ALS(implicitPrefs=True)

    #model=als.fit(train)

    paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [20, 120]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [0.01, 1.0]) \
                    .addGrid(alsImplicit.alpha, [10.0, 40.0]) \
                    .build()


    evaluator=BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="rating",metricName="areaUnderROC")

    # Build the recommendation model using ALS on the training data

    #als = ALS(rank=120, maxIter=15, regParam=0.01, implicitPrefs=True)
    #model = als.fit(train)

    cvEstimator= CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluator)

    cvModel=cvEstimator.fit(train)

    return cvModel,evaluator

The problem is when I call this function this would give the following error:

Running the ALS function to train the data

model,evaluator=recommendation(train)
---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-21-ea5de889f984> in <module>()
      1 # Running the ALS function to train the data
      2 
----> 3 model,evaluator=recommendation(train)

<ipython-input-15-0fb855b138b1> in recommendation(train)
    138     cvEstimator= CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluator)
    139 
--> 140     cvModel=cvEstimator.fit(train)
    141 
    142     return cvModel,evaluator

/Users/i854319/spark/python/pyspark/ml/pipeline.pyc in fit(self, dataset, params)
     67                 return self.copy(params)._fit(dataset)
     68             else:
---> 69                 return self._fit(dataset)
     70         else:
     71             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/Users/i854319/spark/python/pyspark/ml/tuning.pyc in _fit(self, dataset)
    239                 model = est.fit(train, epm[j])
    240                 # TODO: duplicate evaluator to take extra params from input
--> 241                 metric = eva.evaluate(model.transform(validation, epm[j]))
    242                 metrics[j] += metric
    243 

/Users/i854319/spark/python/pyspark/ml/evaluation.pyc in evaluate(self, dataset, params)
     67                 return self.copy(params)._evaluate(dataset)
     68             else:
---> 69                 return self._evaluate(dataset)
     70         else:
     71             raise ValueError("Params must be a param map but got %s." % type(params))

/Users/i854319/spark/python/pyspark/ml/evaluation.pyc in _evaluate(self, dataset)
     97         """
     98         self._transfer_params_to_java()
---> 99         return self._java_obj.evaluate(dataset._jdf)
    100 
    101     def isLargerBetter(self):

/Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/Users/i854319/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     51                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     52             if s.startswith('java.lang.IllegalArgumentException: '):
---> 53                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     54             raise
     55     return deco

IllegalArgumentException: u'requirement failed: Column prediction must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually FloatType.'

This is expected since the BinaryClassificationEvaluator method expects the Vector of probabilities for prediction values. While cvmodel.bestModel.tranform(data) gives a float values for prediction.

Now while I am able to convert those to DenseVector format in a separate method when I was manually testing the hyper parameter values as below

def calcEval(testDF,predictions,evaluator):
    """ This function checks the evaluation metric for the recommendation algorithm

    testDF-> Validation or Test data to check the evalutation metric on

    """

    from pyspark.sql.functions import udf
    from pyspark.mllib.linalg import VectorUDT, DenseVector
    from pyspark.sql.types import DoubleType
    from pyspark.ml.evaluation import BinaryClassificationEvaluator



    #predictions=model.transform(testDF)
    #print "Total Count of the predictions data is {}".format(predictions.count())

    ## Converting the Data Type of the Rating and Prediction column

    as_prob = udf(lambda x: DenseVector([1-x,x]), VectorUDT())

    predictions=predictions.withColumn("prediction", as_prob(predictions["prediction"]))

    # Converting the Rating column to DoubleType()

    #predictions=predictions.withColumn("rating", predictions["rating"].cast(DoubleType()))    

    predictions.show(5)

    # Calculating the AUC

    print evaluator.getMetricName(), "The AUC of the Model is {}".format(evaluator.evaluate(predictions))

    print "The AUC under PR curve is {}".format(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"}))

But while using cross validator estimator since the algo chooses the right hyperparamter by testing it on the cross validated dataset within the main cross validator class, I am not sure how to change the data type of the predicted probabilities when the crossValidator Estimator is running.

Can someone guide here?

Baktaawar
  • 7,086
  • 24
  • 81
  • 149
  • 1
    Possible duplicate of [Tuning parameters for implicit pyspark.ml ALS matrix factorization model through pyspark.ml CrossValidator](http://stackoverflow.com/questions/37260902/tuning-parameters-for-implicit-pyspark-ml-als-matrix-factorization-model-through) –  Nov 11 '16 at 23:17
  • @LostInOverflow Thanks. But I am not sure I understood the answer there completely. It doesn't use the Cross validator method of Spark. I want to use Cross-Validator to get the best model. – Baktaawar Nov 12 '16 at 04:17
  • Just try it with crossvalidator. –  Nov 14 '16 at 05:10
  • 1
    I did try Cross-validator. What I am saying is that Cross_validator needs an evaluator parameter and does Evaluation on that. Since the prediction of the transform method of model is not having the data type as Vector it is giving error – Baktaawar Nov 14 '16 at 18:41

0 Answers0