Skip to content

Instantly share code, notes, and snippets.

@ololobus
Created June 5, 2015 00:11
Show Gist options
  • Save ololobus/2910b69bb13b1a5f2767 to your computer and use it in GitHub Desktop.
Save ololobus/2910b69bb13b1a5f2767 to your computer and use it in GitHub Desktop.
#!/usr/bin/env spark-submit
import operator
import random
import math
import os
from pymongo import MongoClient
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark import SparkConf, SparkContext
def calculate_pearson(uid, target_uid):
global user
global user_films
global user_avg
global user_ratings
# global ratings
global users_films_ratings
global users_avg
# global pearsons_count
# target_user_films = set(ratings[ratings.user_id == target_uid].select('film_id').map(operator.itemgetter(0)).collect())
target_user_films = users_films_ratings[target_uid].keys()
common_films = user_films.intersection(target_user_films)
tuid_avg = users_avg[target_uid]
# tuid_rtgs = get_ratings_for_user(target_uid)
tuid_rtgs = users_films_ratings[target_uid]
sum1 = 0.0
sum2 = 0.0
sum3 = 0.0
for fid in common_films:
uid_rtg = user_ratings[fid]
# tuid_rtg = ratings.filter(ratings.user_id == target_uid).filter(ratings.film_id == fid).take(1)[0].rating
tuid_rtg = tuid_rtgs[fid]
sum1 += (uid_rtg - user_avg) * (tuid_rtg - tuid_avg)
sum2 += (uid_rtg - user_avg)**2
sum3 += (tuid_rtg - tuid_avg)**2
factor = min(len(common_films) / 50.0, 1.0)
pearson = 0.0
try:
pearson = sum1 / math.sqrt(sum2) / math.sqrt(sum3)
except:
print target_uid, sum1, sum2, sum3, common_films
# if pearsons_count % 10 == 0:
# print pearsons_count
# pearsons_count += 1
return Row(user_id = target_uid, pearson = pearson, corrected_pearson = pearson * factor, factor = factor)
def get_ratings_for_user(uid):
global ratings
return dict(ratings.filter(ratings.user_id == uid).map(lambda r: (r.film_id, r.rating)).collect())
def get_ratings_for_film(fid):
global ratings
return dict(ratings.filter(ratings.film_id == fid).map(lambda r: (r.user_id, r.rating)).collect())
# mongo = MongoClient()
logins = list(mongo.npl.students.find().sort('_id', 1))
conf = SparkConf() \
.setMaster('local') \
.setAppName('Lab 11') \
.set('spark.locality.wait', '20000')
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
# Loading data
ratings_data = sc.textFile('u.data')
films_data = sc.textFile('u.item')
ratings_rdd = ratings_data \
.map(lambda l: l.split('\t')) \
.map(lambda p: Row(user_id = int(p[0]), film_id = int(p[1]), rating = int(p[2])))
films_rdd = films_data \
.map(lambda l: l.split('|')) \
.map(lambda p: Row(id = int(p[0]), name = p[1]))
sqlContext.inferSchema(ratings_rdd).registerTempTable('ratings')
sqlContext.inferSchema(films_rdd).registerTempTable('films')
ratings = sqlContext.createDataFrame(ratings_rdd).cache()
films = sqlContext.createDataFrame(films_rdd).cache()
users = sqlContext.sql("SELECT DISTINCT user_id FROM ratings")
users_rdd = users.map(lambda u: Row(id = u.user_id))
users = sqlContext.createDataFrame(users_rdd).cache()
sqlContext.inferSchema(users_rdd).registerTempTable('users')
sqlContext.cacheTable('ratings')
sqlContext.cacheTable('users')
sqlContext.cacheTable('films')
# Average params and cacheing
uids = sorted(users.select('id').map(lambda x: x.id).collect())
fids = sorted(films.select('id').map(lambda x: x.id).collect())
average_rating = sqlContext.sql("SELECT AVG(rating) as average FROM ratings") \
.take(1)[0].average
average_user_ratings = ratings.count() / float(users.count())
average_film_ratings = ratings.count() / float(films.count())
completeness = float(ratings.count()) / (films.count() * users.count())
users_avg = sqlContext \
.sql("""SELECT users.id, AVG(ratings.rating) AS average
FROM users
INNER JOIN ratings ON users.id = ratings.user_id
GROUP BY users.id""") \
.cache()
users_avg = dict(users_avg.map(lambda r: (r.id, r.average)).collect())
selected_users = sqlContext \
.sql("""SELECT users.id, COUNT(ratings.film_id) AS ratings_count
FROM users
INNER JOIN ratings ON users.id = ratings.user_id
GROUP BY users.id
HAVING ratings_count > 50
ORDER BY ratings_count DESC""") \
.cache() \
.take(len(logins))
users_films_ratings = {}
for uid in uids:
users_films_ratings[uid] = get_ratings_for_user(uid)
films_users_ratings = {}
for fid in fids:
films_users_ratings[fid] = get_ratings_for_film(fid)
# Predictors
users_pred = {}
for uid in uids:
uratings = get_ratings_for_user(uid).values()
sum1 = sum(uratings) - len(uratings) * average_rating
users_pred[uid] = 1.0 / (len(uratings) + 10) * sum1
films_pred = {}
for fid in fids:
fratings = get_ratings_for_film(fid)
sum1 = 0.0
for uid in fratings.keys():
sum1 += fratings[uid] - average_rating - users_pred[uid]
films_pred[fid] = 1.0 / (len(fratings.keys()) + 25) * sum1
predictors = {}
for fid in fids:
predictors[fid] = {}
for uid in uids:
predictors[fid][uid] = average_rating + users_pred[uid] + films_pred[fid]
clear_ratings = {}
for uid in uids:
clear_ratings[uid] = {}
for fid in fids:
rtg = 0
if fid in users_films_ratings[uid]:
rtg = users_films_ratings[uid][fid] - predictors[fid][uid]
clear_ratings[uid][fid] = rtg
# Film-film closeness
films_s = {}
for fid1 in fids:
films_s[fid1] = {}
for fid2 in fids:
if fid1 == fid2:
films_s[fid1][fid2] = -1000
continue
if fid2 in films_s:
if fid2 in films_s[fid1] and fid1 in films_s[fid2]:
continue
else:
films_s[fid2] = {}
sum1 = 0.0
sum2 = 0.0
sum3 = 0.0
for uid in uids:
sum1 += clear_ratings[uid][fid1] * clear_ratings[uid][fid2]
sum2 += clear_ratings[uid][fid1]**2
sum3 += clear_ratings[uid][fid2]**2
films_s[fid1][fid2] = sum1 / math.sqrt(sum2) / math.sqrt(sum3)
films_s[fid2][fid1] = films_s[fid1][fid2]
print '----------------------> fid:', fid1
films_neighbours = {}
for fid in fids:
films_neighbours[fid] = map(operator.itemgetter(0), sorted(films_s[fid].items(), key = lambda x: (-x[1], x[0]))[0:30])
i = 0
for login in logins:
lab_result = {}
user = selected_users[i]
user_films = set(users_films_ratings[user.id].keys())
user_avg = users_avg[user.id]
user_ratings = users_films_ratings[user.id]
pearsons = users.filter(users.id != user.id).collect()
pearsons = map(lambda r: calculate_pearson(user.id, r.id), pearsons)
pearsons = sqlContext.createDataFrame(pearsons).cache()
user_neighbours = pearsons.sort(-pearsons.corrected_pearson).take(30)
user_neighbours = map(lambda r: Row(id = r.user_id, corrected_pearson = r.corrected_pearson, ratings = get_ratings_for_user(r.user_id)), user_neighbours)
films_predicted_rating = {}
for fid in fids:
if fid not in user_films:
sum1 = 0.0
sum2 = 0.0
result = users_avg[user.id]
for u in user_neighbours:
if fid in u.ratings.keys():
sum1 += u.corrected_pearson * (u.ratings[fid] - users_avg[u.id])
sum2 += abs(u.corrected_pearson)
if sum2 != 0.0:
result += sum1 / sum2
films_predicted_rating[fid] = result
user_suggested = sorted(films_predicted_rating.items(), key = lambda x: (-x[1], x[0]))[0:10]
user_films_ratings = {}
user_films_ratings_positive = {}
for fid in fids:
rtg = predictors[fid][user.id]
rtg2 = predictors[fid][user.id]
sum1 = 0.0
sum2 = 0.0
sum21 = 0.0
sum22 = 0.0
for nfid in films_neighbours[fid]:
if nfid in user_films:
sum1 += films_s[fid][nfid] * clear_ratings[user.id][nfid]
sum2 += abs(films_s[fid][nfid])
if films_s[fid][nfid] > 0:
sum21 += films_s[fid][nfid] * clear_ratings[user.id][nfid]
sum22 += abs(films_s[fid][nfid])
if sum2 != 0.0:
rtg += sum1 / sum2
if sum22 != 0.0:
rtg2 += sum21 / sum22
user_films_ratings[fid] = rtg
user_films_ratings_positive[fid] = rtg2
lab_result['user_id'] = user.id
lab_result['pearson_neighbours'] = map(operator.itemgetter(1), user_neighbours)
lab_result['pearson_top10'] = map(operator.itemgetter(0), user_suggested)
lab_result['predicators_top10'] = map(operator.itemgetter(0), sorted(user_films_ratings.items(), key = lambda x: (-x[1], x[0]))[0:10])
lab_result['predicators_positive_top10'] = map(operator.itemgetter(0), sorted(user_films_ratings_positive.items(), key = lambda x: (-x[1], x[0]))[0:10])
mongo.npl.students.update({ '_id': login['_id'] }, { '$set': { 'lab11': lab_result } }, upsert = False, multi = False)
i += 1
print login['_id'], lab_result
print '\n'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment