Created
June 29, 2015 19:51
-
-
Save jamesthomson/d56b6491216b9a58996e to your computer and use it in GitHub Desktop.
local version of the lastfm recommendations in spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#start a terminal at the folder where spark is installed | |
#in the command line run this to fire up a pyspark instance | |
./bin/pyspark | |
########################### | |
### LOADING IN THE DATA ### | |
########################### | |
#load in the file and examine | |
lines = sc.textFile('usersha1-artmbid-artname-plays.tsv') | |
type(lines) | |
lines.count() | |
#17,559,530 | |
#split each line of the file by the tab delimiter and check first line | |
data = lines.map(lambda l: l.split('\t')) | |
data.first() | |
#keep the relevent columns user, artist and plays as rating and check first line | |
ratings = data.map(lambda d: (d[0], d[2], 1)) | |
ratings.first() | |
#file is too big to process locally need to reduce. | |
sample = ratings.sample(withReplacement=False, fraction=0.001, seed=123456789) | |
sample.count() | |
#1,756,476 | |
########################################### | |
### REFORMATTING DATA TO SUIT MLLIB ALS ### | |
########################################### | |
#mllib als requires users and items to be integers create lookups | |
users = sample.map(lambda s: s[0]).distinct() | |
users.count() | |
#355,756 | |
#create unique id | |
users_lkp=users.zipWithUniqueId() | |
users_lkp.top(5) | |
items_lkp = sample.map(lambda s: s[1]).distinct().zipWithUniqueId() | |
items_lkp.count() | |
#104,677 | |
#now we need to replace the user and artist with the id's in the lookup | |
#this took some thinking. you can only really join tuples. but we have (user, artist, rating) and (artist, id) | |
#so re-map sample data to have artist as index and user and rating as data i.e. (artist:(user,rating)) | |
#join to the items_lkp it will match on the index of artist i.e. (artist:id) | |
#this will produce something of hte form artist:((user,rating),id) | |
#then re-map your output so that it is in the original form but now is (user, id, rating) | |
repArtist=sample.map(lambda (u,a,r):(a,(u,r))).join(items_lkp).map(lambda (a,((u,r),i)):(u,i,r)) | |
#repeat to switch out the Users | |
repUser=repArtist.map(lambda (u,a,r):(u,(a,r))).join(users_lkp).map(lambda (u,((a,r),i)):(i,a,r)) | |
########################## | |
### BUILDING THE MODEL ### | |
########################## | |
#import recs model from MLlib | |
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating | |
#rank is the number of latent factors in the model. | |
rank = 20 | |
#iterations is the number of iterations to run. | |
numIterations = 10 | |
#generate the model | |
model = ALS.trainImplicit(repUser, rank, numIterations, 0.01) | |
#from the input data keep the user and the item to create a test set | |
testdata = repUser.map(lambda r: (r[0], r[1])) | |
#then use the model to generate rating predictions for the test set | |
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) | |
predictions.top(5) | |
#join the original rating back to the prediction | |
ratesAndPreds = repUser.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) | |
ratesAndPreds.top(5) | |
#then look at the accuracy of hte prediction by calulating an MSE | |
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y) / ratesAndPreds.count() | |
print("Mean Squared Error = " + str(MSE)) | |
#Mean Squared Error = 3.87181792494e-05 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment