Created
November 27, 2016 15:34
-
-
Save nerdroychan/fcde3e7b3496c6d6cda6b6b080e455ac to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import matplotlib as plt | |
import random | |
import copy | |
import csv | |
import difflib | |
R = [] | |
# Read the training set, init it and transpose | |
with open('training.csv') as training_file: | |
reader = csv.reader(training_file) | |
r = 0 | |
c = 0 | |
for row in reader: | |
c = 0 | |
for column in row: | |
try: | |
R[c].append(float(column)) | |
except: | |
R.append([]) | |
R[c].append(float(column)) | |
c += 1 | |
r += 1 | |
people_num = len(R) | |
movie_num = len(R[0]) | |
test_points = [] | |
sample_num, test_num = 0, 0 | |
for i in range(people_num): | |
for j in range(movie_num): | |
if R[i][j] == -1.0: | |
R[i][j] = 0.0 | |
test_num += 1 | |
test_points.append((i, j)) | |
elif R[i][j] != 0.0: | |
sample_num += 1 | |
R_test = [] | |
with open('testing.csv') as testing_file: | |
reader = csv.reader(testing_file) | |
r = 0 | |
c = 0 | |
for row in reader: | |
c = 0 | |
for column in row: | |
try: | |
R_test[c].append(float(column)) | |
except: | |
R_test.append([]) | |
R_test[c].append(float(column)) | |
c += 1 | |
r += 1 | |
# Now, R is the training set, and I have recorded the testing set | |
sample_avg = sum([sum(x) for x in R]) / sample_num | |
A = [[0 for _ in range(people_num+movie_num)] for _ in range(sample_num)] | |
c = [None for _ in range(sample_num)] | |
count = 0 | |
for j in range(movie_num): | |
for i in range(people_num): | |
if R[i][j] != 0.0: | |
c[count] = R[i][j] - sample_avg | |
A[count][i] = 1 | |
A[count][people_num+j] = 1 | |
count += 1 | |
# First, using baseline method | |
A = np.array(A) | |
c = np.array(c) | |
sol = np.dot(np.linalg.pinv(A), np.dot(np.linalg.pinv(A.T), np.dot(A.T, c))) | |
b_u = sol[:people_num] | |
b_i = sol[people_num:] | |
R_hat = copy.deepcopy(R) | |
for j in range(movie_num): | |
for i in range(people_num): | |
if R_hat[i][j] != 0.0 or (i, j) in test_points: | |
tmp = sample_avg + b_u[i] + b_i[j] | |
R_hat[i][j] = float(max(min(5, tmp), 1)) | |
# Training RMSE | |
RMSE_training = 0 | |
count = 0 | |
for j in range(movie_num): | |
for i in range(people_num): | |
if R[i][j] != 0.0: | |
RMSE_training += (R_hat[i][j] - R[i][j]) ** 2 | |
RMSE_training = np.sqrt(RMSE_training/sample_num) | |
print('Baseline method training RMSE: ', RMSE_training) | |
# Testing RMSE | |
RMSE_testing = 0 | |
for i in test_points: | |
RMSE_testing += (R_hat[i[0]][i[1]] - R_test[i[0]][i[1]]) ** 2 | |
RMSE_testing = np.sqrt(RMSE_testing/len(test_points)) | |
print('Baseline method testing RMSE: ', RMSE_testing) | |
# Second, using neighbourhood method | |
for i in test_points: | |
R_hat[i[0]][i[1]] = 0.0 | |
R_wave = np.array(R) - np.array(R_hat) | |
# Movie to movie interaction | |
compress_ratio = 4 | |
D = [[0 for _ in range(movie_num)] for _ in range(movie_num)] | |
for i in range(movie_num): | |
for j in range(movie_num): | |
common = 0 | |
if i == j: | |
D[i][j] = 0 | |
else: | |
s = 0 | |
mi = 0 | |
mj = 0 | |
for k in range(people_num): | |
if R[k][i] != 0.0 and R[k][j] != 0.0: | |
common += 1 | |
s += R_wave[k][i] * R_wave[k][j] | |
mi += (R_wave[k][i]+1E-3)**2 | |
mj += (R_wave[k][j]+1E-3)**2 | |
if mi*mj == 0: | |
deg = 1E-5 | |
else: | |
deg = s / np.sqrt(mi*mj) * (common/(common+compress_ratio)) | |
D[i][j] = deg | |
D[j][i] = deg | |
# User to User method | |
compress_ratio = 0 | |
U = [[0 for _ in range(people_num)] for _ in range(people_num)] | |
for i in range(people_num): | |
for j in range(people_num): | |
common = 0 | |
if i == j: | |
U[i][j] = 0 | |
else: | |
s = 0 | |
mi = 0 | |
mj = 0 | |
for k in range(movie_num): | |
if R[i][k] != 0.0 and R[j][k] != 0.0: | |
common += 1 | |
s += R_wave[i][k] * R_wave[j][k] | |
mi += (R_wave[i][k]+1E-3)**2 | |
mj += (R_wave[j][k]+1E-3)**2 | |
if mi*mj == 0: | |
deg = 1E-5 | |
else: | |
deg = s / np.sqrt(mi*mj) * (common/(common+compress_ratio)) | |
U[i][j] = deg | |
U[j][i] = deg | |
L = movie_num//11 | |
Lu = people_num | |
for j in range(movie_num): | |
for i in range(people_num): | |
tmp = sample_avg + b_u[i] + b_i[j] | |
s = 0 | |
m = 0 | |
srt_d = sorted(D[j][:j]+D[j][j+1:], key=lambda x: abs(x), reverse=True)[:L] | |
# Add D | |
for k in range(movie_num): | |
if j != k and D[j][k] in srt_d: | |
s += D[j][k] * R_wave[i][k] | |
m += abs(D[j][k]) | |
tmp += s / m | |
# Add U | |
s = 0 | |
m = 0 | |
srt_u = sorted(U[i][:i]+U[i][i+1:], key=lambda x: abs(x), reverse=True)[:Lu] | |
for k in range(people_num): | |
if i != k and U[i][k] in srt_u: | |
s += U[k][i] * R_wave[k][j] | |
m += abs(U[i][k]) | |
tmp += s / m | |
if R[i][j] != 0 or (i, j) in test_points: | |
R_hat[i][j] = max(min(5, tmp), 1) | |
# Training RMSE | |
RMSE_training = 0 | |
count = 0 | |
for j in range(movie_num): | |
for i in range(people_num): | |
if R[i][j] != 0.0: | |
RMSE_training += (R_hat[i][j] - R[i][j]) ** 2 | |
count += 1 | |
RMSE_training = np.sqrt(RMSE_training/count) | |
print('Neighbourhood method training RMSE: ', RMSE_training) | |
# Testing RMSE | |
RMSE_testing = 0 | |
for i in test_points: | |
RMSE_testing += (R_hat[i[0]][i[1]] - R_test[i[0]][i[1]]) ** 2 | |
RMSE_testing = np.sqrt(RMSE_testing/len(test_points)) | |
print('Neighbourhood method testing RMSE: ', RMSE_testing) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment