Created
June 5, 2015 00:11
-
-
Save ololobus/2910b69bb13b1a5f2767 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env spark-submit | |
import operator | |
import random | |
import math | |
import os | |
from pymongo import MongoClient | |
from pyspark.sql import SQLContext, Row | |
from pyspark.sql.functions import UserDefinedFunction | |
from pyspark.sql.types import DoubleType | |
from pyspark import SparkConf, SparkContext | |
def calculate_pearson(uid, target_uid): | |
global user | |
global user_films | |
global user_avg | |
global user_ratings | |
# global ratings | |
global users_films_ratings | |
global users_avg | |
# global pearsons_count | |
# target_user_films = set(ratings[ratings.user_id == target_uid].select('film_id').map(operator.itemgetter(0)).collect()) | |
target_user_films = users_films_ratings[target_uid].keys() | |
common_films = user_films.intersection(target_user_films) | |
tuid_avg = users_avg[target_uid] | |
# tuid_rtgs = get_ratings_for_user(target_uid) | |
tuid_rtgs = users_films_ratings[target_uid] | |
sum1 = 0.0 | |
sum2 = 0.0 | |
sum3 = 0.0 | |
for fid in common_films: | |
uid_rtg = user_ratings[fid] | |
# tuid_rtg = ratings.filter(ratings.user_id == target_uid).filter(ratings.film_id == fid).take(1)[0].rating | |
tuid_rtg = tuid_rtgs[fid] | |
sum1 += (uid_rtg - user_avg) * (tuid_rtg - tuid_avg) | |
sum2 += (uid_rtg - user_avg)**2 | |
sum3 += (tuid_rtg - tuid_avg)**2 | |
factor = min(len(common_films) / 50.0, 1.0) | |
pearson = 0.0 | |
try: | |
pearson = sum1 / math.sqrt(sum2) / math.sqrt(sum3) | |
except: | |
print target_uid, sum1, sum2, sum3, common_films | |
# if pearsons_count % 10 == 0: | |
# print pearsons_count | |
# pearsons_count += 1 | |
return Row(user_id = target_uid, pearson = pearson, corrected_pearson = pearson * factor, factor = factor) | |
def get_ratings_for_user(uid): | |
global ratings | |
return dict(ratings.filter(ratings.user_id == uid).map(lambda r: (r.film_id, r.rating)).collect()) | |
def get_ratings_for_film(fid): | |
global ratings | |
return dict(ratings.filter(ratings.film_id == fid).map(lambda r: (r.user_id, r.rating)).collect()) | |
# mongo = MongoClient() | |
logins = list(mongo.npl.students.find().sort('_id', 1)) | |
conf = SparkConf() \ | |
.setMaster('local') \ | |
.setAppName('Lab 11') \ | |
.set('spark.locality.wait', '20000') | |
sc = SparkContext(conf = conf) | |
sqlContext = SQLContext(sc) | |
# Loading data | |
ratings_data = sc.textFile('u.data') | |
films_data = sc.textFile('u.item') | |
ratings_rdd = ratings_data \ | |
.map(lambda l: l.split('\t')) \ | |
.map(lambda p: Row(user_id = int(p[0]), film_id = int(p[1]), rating = int(p[2]))) | |
films_rdd = films_data \ | |
.map(lambda l: l.split('|')) \ | |
.map(lambda p: Row(id = int(p[0]), name = p[1])) | |
sqlContext.inferSchema(ratings_rdd).registerTempTable('ratings') | |
sqlContext.inferSchema(films_rdd).registerTempTable('films') | |
ratings = sqlContext.createDataFrame(ratings_rdd).cache() | |
films = sqlContext.createDataFrame(films_rdd).cache() | |
users = sqlContext.sql("SELECT DISTINCT user_id FROM ratings") | |
users_rdd = users.map(lambda u: Row(id = u.user_id)) | |
users = sqlContext.createDataFrame(users_rdd).cache() | |
sqlContext.inferSchema(users_rdd).registerTempTable('users') | |
sqlContext.cacheTable('ratings') | |
sqlContext.cacheTable('users') | |
sqlContext.cacheTable('films') | |
# Average params and cacheing | |
uids = sorted(users.select('id').map(lambda x: x.id).collect()) | |
fids = sorted(films.select('id').map(lambda x: x.id).collect()) | |
average_rating = sqlContext.sql("SELECT AVG(rating) as average FROM ratings") \ | |
.take(1)[0].average | |
average_user_ratings = ratings.count() / float(users.count()) | |
average_film_ratings = ratings.count() / float(films.count()) | |
completeness = float(ratings.count()) / (films.count() * users.count()) | |
users_avg = sqlContext \ | |
.sql("""SELECT users.id, AVG(ratings.rating) AS average | |
FROM users | |
INNER JOIN ratings ON users.id = ratings.user_id | |
GROUP BY users.id""") \ | |
.cache() | |
users_avg = dict(users_avg.map(lambda r: (r.id, r.average)).collect()) | |
selected_users = sqlContext \ | |
.sql("""SELECT users.id, COUNT(ratings.film_id) AS ratings_count | |
FROM users | |
INNER JOIN ratings ON users.id = ratings.user_id | |
GROUP BY users.id | |
HAVING ratings_count > 50 | |
ORDER BY ratings_count DESC""") \ | |
.cache() \ | |
.take(len(logins)) | |
users_films_ratings = {} | |
for uid in uids: | |
users_films_ratings[uid] = get_ratings_for_user(uid) | |
films_users_ratings = {} | |
for fid in fids: | |
films_users_ratings[fid] = get_ratings_for_film(fid) | |
# Predictors | |
users_pred = {} | |
for uid in uids: | |
uratings = get_ratings_for_user(uid).values() | |
sum1 = sum(uratings) - len(uratings) * average_rating | |
users_pred[uid] = 1.0 / (len(uratings) + 10) * sum1 | |
films_pred = {} | |
for fid in fids: | |
fratings = get_ratings_for_film(fid) | |
sum1 = 0.0 | |
for uid in fratings.keys(): | |
sum1 += fratings[uid] - average_rating - users_pred[uid] | |
films_pred[fid] = 1.0 / (len(fratings.keys()) + 25) * sum1 | |
predictors = {} | |
for fid in fids: | |
predictors[fid] = {} | |
for uid in uids: | |
predictors[fid][uid] = average_rating + users_pred[uid] + films_pred[fid] | |
clear_ratings = {} | |
for uid in uids: | |
clear_ratings[uid] = {} | |
for fid in fids: | |
rtg = 0 | |
if fid in users_films_ratings[uid]: | |
rtg = users_films_ratings[uid][fid] - predictors[fid][uid] | |
clear_ratings[uid][fid] = rtg | |
# Film-film closeness | |
films_s = {} | |
for fid1 in fids: | |
films_s[fid1] = {} | |
for fid2 in fids: | |
if fid1 == fid2: | |
films_s[fid1][fid2] = -1000 | |
continue | |
if fid2 in films_s: | |
if fid2 in films_s[fid1] and fid1 in films_s[fid2]: | |
continue | |
else: | |
films_s[fid2] = {} | |
sum1 = 0.0 | |
sum2 = 0.0 | |
sum3 = 0.0 | |
for uid in uids: | |
sum1 += clear_ratings[uid][fid1] * clear_ratings[uid][fid2] | |
sum2 += clear_ratings[uid][fid1]**2 | |
sum3 += clear_ratings[uid][fid2]**2 | |
films_s[fid1][fid2] = sum1 / math.sqrt(sum2) / math.sqrt(sum3) | |
films_s[fid2][fid1] = films_s[fid1][fid2] | |
print '----------------------> fid:', fid1 | |
films_neighbours = {} | |
for fid in fids: | |
films_neighbours[fid] = map(operator.itemgetter(0), sorted(films_s[fid].items(), key = lambda x: (-x[1], x[0]))[0:30]) | |
i = 0 | |
for login in logins: | |
lab_result = {} | |
user = selected_users[i] | |
user_films = set(users_films_ratings[user.id].keys()) | |
user_avg = users_avg[user.id] | |
user_ratings = users_films_ratings[user.id] | |
pearsons = users.filter(users.id != user.id).collect() | |
pearsons = map(lambda r: calculate_pearson(user.id, r.id), pearsons) | |
pearsons = sqlContext.createDataFrame(pearsons).cache() | |
user_neighbours = pearsons.sort(-pearsons.corrected_pearson).take(30) | |
user_neighbours = map(lambda r: Row(id = r.user_id, corrected_pearson = r.corrected_pearson, ratings = get_ratings_for_user(r.user_id)), user_neighbours) | |
films_predicted_rating = {} | |
for fid in fids: | |
if fid not in user_films: | |
sum1 = 0.0 | |
sum2 = 0.0 | |
result = users_avg[user.id] | |
for u in user_neighbours: | |
if fid in u.ratings.keys(): | |
sum1 += u.corrected_pearson * (u.ratings[fid] - users_avg[u.id]) | |
sum2 += abs(u.corrected_pearson) | |
if sum2 != 0.0: | |
result += sum1 / sum2 | |
films_predicted_rating[fid] = result | |
user_suggested = sorted(films_predicted_rating.items(), key = lambda x: (-x[1], x[0]))[0:10] | |
user_films_ratings = {} | |
user_films_ratings_positive = {} | |
for fid in fids: | |
rtg = predictors[fid][user.id] | |
rtg2 = predictors[fid][user.id] | |
sum1 = 0.0 | |
sum2 = 0.0 | |
sum21 = 0.0 | |
sum22 = 0.0 | |
for nfid in films_neighbours[fid]: | |
if nfid in user_films: | |
sum1 += films_s[fid][nfid] * clear_ratings[user.id][nfid] | |
sum2 += abs(films_s[fid][nfid]) | |
if films_s[fid][nfid] > 0: | |
sum21 += films_s[fid][nfid] * clear_ratings[user.id][nfid] | |
sum22 += abs(films_s[fid][nfid]) | |
if sum2 != 0.0: | |
rtg += sum1 / sum2 | |
if sum22 != 0.0: | |
rtg2 += sum21 / sum22 | |
user_films_ratings[fid] = rtg | |
user_films_ratings_positive[fid] = rtg2 | |
lab_result['user_id'] = user.id | |
lab_result['pearson_neighbours'] = map(operator.itemgetter(1), user_neighbours) | |
lab_result['pearson_top10'] = map(operator.itemgetter(0), user_suggested) | |
lab_result['predicators_top10'] = map(operator.itemgetter(0), sorted(user_films_ratings.items(), key = lambda x: (-x[1], x[0]))[0:10]) | |
lab_result['predicators_positive_top10'] = map(operator.itemgetter(0), sorted(user_films_ratings_positive.items(), key = lambda x: (-x[1], x[0]))[0:10]) | |
mongo.npl.students.update({ '_id': login['_id'] }, { '$set': { 'lab11': lab_result } }, upsert = False, multi = False) | |
i += 1 | |
print login['_id'], lab_result | |
print '\n' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment