15

I'm trying to tune the parameters of an ALS matrix factorization model that uses implicit data. For this, I'm trying to use pyspark.ml.tuning.CrossValidator to run through a parameter grid and select the best model. I believe my problem is in the evaluator, but I can't figure it out.

I can get this to work for an explicit data model with a regression RMSE evaluator, as follows:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import rand


conf = SparkConf() \
  .setAppName("MovieLensALS") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])
dfRatingsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsExplicit = ALS()
defaultModel = alsExplicit.fit(dfRatings)

paramMapExplicit = ParamGridBuilder() \
                    .addGrid(alsExplicit.rank, [8, 12]) \
                    .addGrid(alsExplicit.maxIter, [10, 15]) \
                    .addGrid(alsExplicit.regParam, [1.0, 10.0]) \
                    .build()

evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cvExplicit = CrossValidator(estimator=alsExplicit, estimatorParamMaps=paramMapExplicit, evaluator=evaluatorR)
cvModelExplicit = cvExplicit.fit(dfRatings)

predsExplicit = cvModelExplicit.bestModel.transform(dfRatingsTest)
predsExplicit.show()

When I try to do this for implicit data (let's say counts of views rather than ratings), I get an error that I can't quite figure out. Here's the code (very similar to the above):

dfCounts = sqlContext.createDataFrame([(0,0,0), (0,1,12), (0,2,3), (1,0,5), (1,1,9), (1,2,0), (2,0,0), (2,1,11), (2,2,25)],
                                 ["user", "item", "rating"])
dfCountsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCounts)

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

evaluatorB = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="rating")
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR)
cvModel = cv.fit(dfCounts)

predsImplicit = cvModel.bestModel.transform(dfCountsTest)
predsImplicit.show()

I tried doing this with an RMSE evaluator and I get an error. As I understand, I should also be able to use the AUC metric for the binary classification evaluator, because the predictions of the implicit matrix factorization are a confidence matrix c_ui for predictions of a binary matrix p_ui per this paper, which the documentation for pyspark ALS cites.

Using either evaluator gives me an error and I can't find any fruitful discussion about cross-validating implicit ALS models online. I'm looking through the CrossValidator source code to try to figure out what's wrong, but am having trouble. One of my thoughts is that after the process converts the implicit data matrix r_ui to the binary matrix p_ui and confidence matrix c_ui, I'm not sure what it's comparing the predicted c_ui matrix against during the evaluation stage.

Here is the error:

Traceback (most recent call last):

  File "<ipython-input-16-6c43b997005e>", line 1, in <module>
    cvModel = cv.fit(dfCounts)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 69, in fit
    return self._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\tuning.py", line 239, in _fit
    model = est.fit(train, epm[j])

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 67, in fit
    return self.copy(params)._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 133, in _fit
    java_model = self._fit_java(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 130, in _fit_java
    return self._java_obj.fit(dataset._jdf)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)

etc.......

UPDATE

I tried scaling the input so it's in the range of 0 to 1 and using a RMSE evaluator. It seems to work well until I try to insert it into the CrossValidator.

The following code works. I get predictions and i get an RMSE value from my evaluator.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator


conf = SparkConf() \
  .setAppName("ALSPractice") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

# Users 0, 1, 2, 3 - Items 0, 1, 2, 3, 4, 5 - Ratings 0.0-5.0
dfCounts2 = sqlContext.createDataFrame([(0,0,5.0), (0,1,5.0),            (0,3,0.0), (0,4,0.0), 
                                        (1,0,5.0),            (1,2,4.0), (1,3,0.0), (1,4,0.0),
                                        (2,0,0.0),            (2,2,0.0), (2,3,5.0), (2,4,5.0),
                                        (3,0,0.0), (3,1,0.0),            (3,3,4.0)            ],
                                       ["user", "item", "rating"])

dfCountsTest2 = sqlContext.createDataFrame([(0,0), (0,1), (0,2), (0,3), (0,4),
                                            (1,0), (1,1), (1,2), (1,3), (1,4),
                                            (2,0), (2,1), (2,2), (2,3), (2,4),
                                            (3,0), (3,1), (3,2), (3,3), (3,4)], ["user", "item"])

# Normalize rating data to [0,1] range based on max rating
colmax = dfCounts2.select(F.max('rating')).collect()[0].asDict().values()[0]
normalize = udf(lambda x: x/colmax, FloatType())
dfCountsNorm = dfCounts2.withColumn('ratingNorm', normalize(col('rating')))

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCountsNorm)
preds = defaultModelImplicit.transform(dfCountsTest2)

evaluatorR2 = RegressionEvaluator(metricName="rmse", labelCol="ratingNorm")
evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm))

preds = defaultModelImplicit.transform(dfCountsTest2)

What I don't understand is why the following doesn't work. I'm using the same estimator, the same evaluator and fitting the same data. Why would these work above but not within the CrossValidator:

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR2)
cvModel = cv.fit(dfCountsNorm)
ilyab
  • 229
  • 2
  • 8
  • Thank you for posting this question. In your edit, why are calculating the RMSE using: `evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm))` Instead of `evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsTest2))` – Archimeow Jan 05 '17 at 23:32
  • 1
    is it true, while using ALS for implicit, we need to normalise the implicit data in the 0-1 range? – Hardik Gupta Jul 07 '20 at 11:16

4 Answers4

12

Ignoring technical issues, strictly speaking neither method is correct given the input generated by ALS with implicit feedback.

  • you cannot use RegressionEvaluator because, as you already know, prediction can be interpreted as a confidence value and is represented as a floating point number in range [0, 1] and label column is just an unbound integer. These values are clearly not comparable.
  • you cannot use BinaryClassificationEvaluator because even if the prediction can be interpreted as probability label doesn't represent binary decision. Moreover prediction column has invalid type and couldn't be used directly with BinaryClassificationEvaluator

You can try to convert one of the columns so input fit the requirements but this is is not really a justified approach from a theoretical perspective and introduces additional parameters which are hard to tune.

  • map label column to [0, 1] range and use RMSE.

  • convert label column to binary indicator with fixed threshold and extend ALS / ALSModel to return expected column type. Assuming threshold value is 1 it could be something like this

    from pyspark.ml.recommendation import *
    from pyspark.sql.functions import udf, col
    from pyspark.mllib.linalg import DenseVector, VectorUDT
    
    class BinaryALS(ALS):
        def fit(self, df):
            assert self.getImplicitPrefs()
            model = super(BinaryALS, self).fit(df)
            return ALSBinaryModel(model._java_obj)
    
    class ALSBinaryModel(ALSModel):
        def transform(self, df):
            transformed = super(ALSBinaryModel, self).transform(df)
            as_vector = udf(lambda x: DenseVector([1 - x, x]), VectorUDT())
            return transformed.withColumn(
                "rawPrediction", as_vector(col("prediction")))
    
    # Add binary label column
    with_binary = dfCounts.withColumn(
        "label_binary", (col("rating") > 0).cast("double"))
    
    als_binary_model = BinaryALS(implicitPrefs=True).fit(with_binary)
    
    evaluatorB = BinaryClassificationEvaluator(
        metricName="areaUnderROC", labelCol="label_binary")
    
    evaluatorB.evaluate(als_binary_model.transform(with_binary))
    ## 1.0
    

Generally speaking, material about evaluating recommender systems with implicit feedbacks is kind of missing in textbooks, I suggest you take a read on eliasah's answer about evaluating these kind of recommenders.

eliasah
  • 39,588
  • 11
  • 124
  • 154
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Thanks very much for your answer. I tried both of your suggestions and decided to go with the scaling and RMSE evaluation method. It seems to work well, except for when I insert into the CrossValidator function. I thought that the CrossValidator used the estimator and evaluator in the same way while automating k-fold cross-validation and iterating through parameter combinations from the grid. I updated my post above with my new code (the stuff that works and stuff that doesn't). Would you have any insight on what the CrossValidator is doing that's causing it to error out on the fit statement? – ilyab May 18 '16 at 22:31
  • 1
    It is not related to the evaluator strategy. It simply (or not so simply) means that it is not possible to solve the system for a given data and a set of parameters. – zero323 May 19 '16 at 00:46
  • I tried setting the parameter grid to only contain the default estimator parameter values. I'm still getting the error, so I assume it's not the parameter combination. Would you have an idea of where there might be an issue? I suppose I can set up my own nested loops to perform the iteration through parameter combinations and cross validation, but I would prefer to get this going using Spark's built-in functions. – ilyab May 19 '16 at 15:11
  • @zero323, I see that you are using `lambda x: DenseVector([1 - x, x])` to convert from a single prediction value to a vector of 2 values. I notice that the same conversion function is shown in the [official doc example](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.BinaryClassificationEvaluator). What is the logic behind this particular choice of conversion function? Is it the only function that is valid for this purpose? What does it mean? – xenocyon Jun 24 '16 at 19:37
  • 1
    @xenocyon Input can be interpreted as probability of item being relevant. So it simply creates a pair (P(item-irrelevant), P(item-relevant)) which is pretty much what BinaryClassifcationEvaluator expects (P(class(x) =0), P(class(x) = 1)) – zero323 Jun 24 '16 at 19:45
  • 1
    @zero323 I see you are getting an AUC of 1.0. Even I am getting an AUC of 1.0 on validation dataset when I transform it and then run the evaluator. Technically one does't expect AUC to be 1.0 and slightly less than good models. Is something wrong here that one gets 1.0? – Baktaawar Nov 07 '16 at 22:29
1

With implicit feedbacks we don't have user reactions to our recommendations. Thus, we cannot use precision based metrics.

In the already cited paper, the expected percentile ranking metric is used instead.

You can try to implement an Evaluator based on a similar metric in the Spark ML lib, and use it in your Cross Validation pipeline.

0

Very late to the party here, but I'll post in case anyone stumbles upon this question like I did.

I was getting a similar error when trying to use CrossValidator with an ALS model. I resolved it by setting the coldStartStrategy parameter in ALS to "drop". That is:

alsImplicit = ALS(implicitPrefs=True, coldStartStrategy="drop")

and keep the rest of the code the same.

I expect what was happening in my example is that the cross-validation splits created scenarios where I had items in the validation set that did not appear in the training set, which results in NaN prediction values. The best solution is to drop the NaN values when evaluating, as described in the documentation.

I don't know if we were getting the same error so can't guarantee this would solve OP's problem, but it's good practice to set coldStartStrategy="drop" for cross validation anyway.

Note: my error message was "Params must be either a param map or a list/tuple of param maps", which didn't seem to imply an issue with the coldStartStrategy parameter or NaN values but this solution resolved the error.

0

In order to cross validate my ALS model with implicitPrefs=True, I needed to adapt @zero323's answer slightly for pyspark==2.3.0 where I was getting the following exception:

xspy4j.Py4JException: Target Object ID does not exist for this gateway :o2733\\n\tat py4j.Gateway.invoke(Gateway.java...java:79)\\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\\n\tat java.lang.Thread.run(Thread.java:748)\\n

ALS extends JavaEstimator which provides the hooks necessary for fitting Estimators that wrap Java/Scala implementations. We need to override _create_model in BinaryALS so PySpark can keep all the Java object references straight:

import pyspark.sql.functions as F
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql.dataframe import DataFrame


class ALSBinaryModel(ALSModel):
    def transform(self, df: DataFrame) -> DataFrame:
        transformed = super().transform(df)
        as_vector = F.udf(lambda x: DenseVector([1 - x, x]), VectorUDT())
        return transformed.withColumn("rawPrediction", as_vector(F.col("prediction")))


class BinaryALS(ALS):
    def fit(self, df: DataFrame) -> ALSBinaryModel:
        assert self.getImplicitPrefs()
        return super().fit(df)

    def _create_model(self, java_model) -> ALSBinaryModel:
        return ALSBinaryModel(java_model=java_model)
Bill DeRose
  • 2,330
  • 3
  • 25
  • 36