Skip to content

Instantly share code, notes, and snippets.

@porthunt
Last active October 1, 2016 19:48
Show Gist options
  • Save porthunt/4a2746bbb12bc3f129b1081e8f461bb8 to your computer and use it in GitHub Desktop.
Save porthunt/4a2746bbb12bc3f129b1081e8f461bb8 to your computer and use it in GitHub Desktop.
import pandas
import operator
import math
import sys
dataframe = None
file_name = None
rating_limits = (0.5, 5.0)
def __avg__(lst):
return sum(lst)/len(lst)
def rating(rate):
if rate < rating_limits[0]:
return rating_limits[0]
elif rate > rating_limits[1]:
return rating_limits[1]
else:
return rate
def define_dataframe(csv_file):
try:
global dataframe
dataframe = pandas.read_csv(csv_file)
global file_name
file_name = csv_file
except pandas.io.common.EmptyDataError:
raise Exception('Not a valid file.')
def mean_adjusted_dataframe():
averages = {}
adjusted_df = pandas.read_csv(file_name, dtype=str)
for user in adjusted_df.index.values:
averages[user] = __avg__(user_rates(user))
for row in adjusted_df.index.values:
for column in adjusted_df.columns:
cell_value = adjusted_df.get_value(row, column)
if cell_value is not '?':
adj = float(cell_value) - averages[row]
adjusted_df.set_value(row, column, adj)
return adjusted_df
def print_dataframe():
try:
assert dataframe is not None, 'Dataframe was not created.'
print('\n')
print(dataframe)
print('\n')
except AssertionError, e:
print(e.args[0])
def rate(user, item):
try:
return float(dataframe.get_value(user, item))
except ValueError:
return None
except KeyError:
raise Exception('Not a valid user or item.')
def item_rates(item, **kwargs):
rates = []
df = kwargs.get('df', dataframe)
for rate in df[item]:
try:
rates.append(round(float(rate), 2))
except ValueError:
if kwargs.get('complete', False):
rates.append(None)
return rates
def user_rates(user, **kwargs):
rates = []
for rate in dataframe.ix[user]:
try:
rates.append(float(rate))
except ValueError:
if kwargs.get('complete', False):
rates.append(None)
return rates
def sim_user(user_a, user_b):
user_a_rates = user_rates(user_a, complete=True)
user_b_rates = user_rates(user_b, complete=True)
user_a_avg = dataframe.mean(1)[user_a]
user_b_avg = dataframe.mean(1)[user_b]
numerator = 0
denominator_1 = 0
denominator_2 = 0
for element_a, element_b in zip(user_a_rates, user_b_rates):
if element_a is not None and element_b is not None:
numerator += (element_a - user_a_avg)*(element_b - user_b_avg)
denominator_1 += math.pow(element_a - user_a_avg, 2)
denominator_2 += math.pow(element_b - user_b_avg, 2)
return numerator/(math.sqrt(denominator_1)*math.sqrt(denominator_2))
def sim_item(item_a, item_b, adjusted_df):
item_a_rates = item_rates(item_a, complete=True, df=adjusted_df)
item_b_rates = item_rates(item_b, complete=True, df=adjusted_df)
numerator = 0
denominator_1 = 0
denominator_2 = 0
for element_a, element_b in zip(item_a_rates, item_b_rates):
if element_a is not None and element_b is not None:
numerator += element_a * element_b
denominator_1 += math.pow(element_a, 2)
denominator_2 += math.pow(element_b, 2)
return numerator/(math.sqrt(denominator_1) * math.sqrt(denominator_2))
def pred_user_based(curr_user, curr_item):
similarities = {}
numerator = 0
denominator = 0
for data_user in dataframe.index.values:
sim = sim_user(curr_user, data_user)
if data_user != curr_user and sim > 0:
similarities[data_user] = sim
for data_user, sim_ratio in similarities.items():
ratio = rate(data_user, curr_item)
if ratio is not None:
numerator += (sim_ratio *
(ratio - __avg__(user_rates(data_user))))
denominator += sim_ratio
calc = __avg__(user_rates(curr_user)) + (numerator / denominator)
return rating(calc)
def pred_item_based(curr_user, curr_item):
adj_df = mean_adjusted_dataframe()
similarities = {}
numerator = 0
denominator = 0
for data_item in dataframe.columns:
sim = sim_item(curr_item, data_item, adj_df)
if data_item != curr_item and sim > 0:
similarities[data_item] = sim
for data_item, sim_ratio in similarities.items():
item_ratio = rate(curr_user, data_item)
if item_ratio is not None:
numerator += sim_ratio * float(item_ratio)
denominator += sim_ratio
calc = numerator/denominator
return rating(calc)
def pred(user, item):
return (pred_item_based(user, item),
pred_user_based(user, item))
import sys
import os
from rating import *
''' Dataset (based on Table 2.1 from 'Recommender Systems - An Introduction', Cambridge
Item1,Item2,Item3,Item4,Item5
Alice,5,3,4,4,?
User1,3,1,2,3,3
User2,4,3,4,3,5
User3,3,3,1,5,4
User4,1,5,5,2,1
'''
# how to use: python recsys.py dataset.csv Alice Item5
def valid_file(file_name):
valid_extensions = ['csv']
path = os.path.dirname(os.path.abspath(__file__))
extension = file_name.split('.')[-1]
if os.path.isfile(file_name) and extension in valid_extensions:
return True
else:
return False
if __name__ == "__main__":
try:
assert len(sys.argv)-1 is 3, "need 3 arguments"
assert valid_file(sys.argv[1]), "not a valid file"
except AssertionError, e:
print(e.args[0])
sys.exit(1)
user = sys.argv[2]
item = sys.argv[3]
define_dataframe(sys.argv[1])
print_dataframe()
rate = rate(user, item)
if rate is not None:
print('The rate of {} for the {} is {}.\n'.format(user, item, rate))
else:
print('Users that rated {}: {}.'.format(item, len(item_rates(item))))
print('Items rated by {}: {}.'.format(user, len(user_rates(user))))
pred_item, pred_user = pred(user, item)
print('Prediction based on items: %.2f.' % round(pred_item, 2))
print('Prediction based on users: %.2f.' % round(pred_user, 2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment