Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save pvalienteverde/e9e610665fe7592d6dbbb2988e83b394 to your computer and use it in GitHub Desktop.
Save pvalienteverde/e9e610665fe7592d6dbbb2988e83b394 to your computer and use it in GitHub Desktop.
When a userID is predicted when it is not already trained (all userID data is used on validation group and none of them to train), prediction is nan, so RegressionEvaluator returns Nan. To solve this we must change RegressionEvaluator by MiValidacion in CrossValidator
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from math import sqrt
from operator import add
conf = (SparkConf()
.setMaster("local[4]")
.setAppName("Myapp")
.set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
dfRatings = sqlContext.createDataFrame([(0, 0, 4.0),
(0, 1, 2.0),
(0, 5, 5.0),
(0, 4, 4.0),
(1, 1, 3.0),
(1, 2, 4.0),
(1, 5, 5.0),
(1, 4, 4.0),
(2, 1, 1.0),
(2, 2, 5.0)],
["user", "item", "rating"])
from pyspark.ml.evaluation import Evaluator
class MiEvaluador(Evaluator):
'''
When a userID is predicted when it is not already trained (all userID data is used on validation group and none of them to train), prediction is nan, so RegressionEvaluator returns Nan.
To solve this we must change RegressionEvaluator by MiValidacion
'''
def __init__(self,predictionCol='prediction', targetCol='rating'):
super(MiEvaluador, self).__init__()
self.predictionCol=predictionCol
self.targetCol=targetCol
def _evaluate(self, dataset):
error=self.rmse(dataset,self.predictionCol,self.targetCol)
print ("Error: {}".format(error))
return error
def isLargerBetter(self):
return False
@staticmethod
def rmse(dataset,predictionCol,targetCol):
return sqrt(dataset.dropna().map(lambda x: (x[targetCol] - x[predictionCol]) ** 2).reduce(add) / float(dataset.count()))
lr1 = ALS()
grid1 = ParamGridBuilder().addGrid(lr1.regParam, [1.0,0.5,2.0]).build()
evaluator1 = MiEvaluador(predictionCol=lr1.getPredictionCol(),targetCol=lr1.getRatingCol())
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=grid1, evaluator=evaluator1, numFolds=2)
cvModel1 = cv1.fit(dfRatings)
a=cvModel1.transform(dfRatings)
error_cross_validation=MiEvaluador.rmse(a,lr1.getPredictionCol(),lr1.getRatingCol())
print ('rmse with cross validation: {}'.format(error_cross_validation))
error_models=[]
for reg_param in (1.0,0.5,2.0):
lr = ALS(regParam=reg_param)
model = lr.fit(dfRatings)
error=MiEvaluador.rmse(model.transform(dfRatings),lr.getPredictionCol(),lr.getRatingCol())
error_models.append(error)
print ('reg_param: {}, rmse: {}'.format(reg_param,error))
import numpy as np
assert(np.isclose(error_models[np.argmin(error_models)],error_cross_validation))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment