2

I'm trying to tune the parameters of an ALS but always choose the first parameter as best option

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from math import sqrt

from operator import add

conf = (SparkConf()
         .setMaster("local[4]")
         .setAppName("Myapp")
         .set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)

sqlContext = SQLContext(sc)
def computeRmse(data):
    return (sqrt(data.map(lambda x: (x[2] - x[3]) ** 2).reduce(add) / float(data.count())))

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])

lr1 = ALS()
grid1 = ParamGridBuilder().addGrid(lr1.regParam, [1.0,0.005,2.0]).build()
evaluator1 = RegressionEvaluator(predictionCol=lr1.getPredictionCol(),labelCol=lr1.getRatingCol(), metricName='rmse')
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=grid1, evaluator=evaluator1, numFolds=2)
cvModel1 = cv1.fit(dfRatings)
a=cvModel1.transform(dfRatings)
print ('rmse with cross validation: {}'.format(computeRmse(a)))

for reg_param in (1.0,0.005,2.0):
    lr = ALS(regParam=reg_param)
    model = lr.fit(dfRatings)
    print ('reg_param: {}, rmse: {}'.format(reg_param,computeRmse(model.transform(dfRatings))))

Output:
rmse with cross validation: 1.1820489116858794
reg_param: 1.0, rmse: 1.1820489116858794
reg_param: 0.005, rmse: 0.001573816765686575
reg_param: 2.0, rmse: 2.1056964491942787

Any help?

Thanks in advance,

3 Answers3

2

Putting aside other issues you simply don't use enough data to perform meaningful cross validation and evaluation. As I explained and illustrated in Spark ALS predictAll returns empty ALS cannot provide predictions when either user or item are missing from the training set.

It means that each split during cross validation will have undefined predictions and overall evaluation will be undefined. Because of that CrossValidator will return the first possible model because all models you train are equally bad from its perspective.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you, I would like to add that when a userID is predicted when it is not already trained (all userID data is used on validation group and none of them to train), prediction is nan, so RegressionEvaluator returns Nan. To solve this we must change RegressionEvaluator by MiValidacion. Example: [CostumizedValidator](https://gist.github.com/pvalienteverde/e9e610665fe7592d6dbbb2988e83b394) – pedro valiente verde Jul 17 '16 at 14:18
1

In your CrossValidator, you fix the number of folds to be 1. However, the parameter numFolds must be >=2. Using only one fold defeats with the idea of separation into train and test set.

Christian Hirsch
  • 1,996
  • 12
  • 16
0

I implemented a Pipeline solution, where I added a custom transformer to the final stage of my pipeline, so that nan predictions would be dropped. Note, that this implementation is for Spark < 2.2.0, because the keyword coldStartStrategy, was not introduced. Therefore, if you're using Spark==2.2.0, then you wouldn't need the additional stage.

First, I introduce the custom transformer that applies the nan drops.

from pyspark.ml import Transformer

class DropNAPredictions(Transformer):
    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

Now I can build my pipeline and train using cross validation:

dropna = DropNAPredictions()

als = ALS(maxIter=10, userCol="player", itemCol="item", ratingCol="rating", implicitPrefs=False)

pipeline = Pipeline(stages=[als, dropna])
paramGrid = ParamGridBuilder().addGrid(als.regParam, [0.1, 0.05]) \
    .addGrid(als.rank, [1, 3]) \
    .build()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=RegressionEvaluator(labelCol="rating"),
                    numFolds=3)

cvModel = cv.fit(training)

A note on persistence: The pipeline can't be saved due to the custom transformer. There's a post that discusses options for serializing custom transformers but I haven't gone down that rabbit hole to hack at a solution. As a temporary solution, you can serialize just the ALS model itself, and then later rebuild the pipeline by adding the custom transformer to the pipeline.

bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[0]  # extracts the ALS model
bestModel.save("s2s_als_stage")

from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.recommendation import ALSModel

mymodel = ALSModel.load('s2s_als_stage')
pipeline = PipelineModel(stages=[mymodel, dropna])  # dropna is the custom transformer
pred_test = pipeline.transform(test)  # score test data
Scratch'N'Purr
  • 9,959
  • 2
  • 35
  • 51