-
-
Save ixtel/cdf55540bfa777be4059 to your computer and use it in GitHub Desktop.
Computation of similarity scores - Euclidean Distance and Pearson Correlations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import operator | |
from math import sqrt | |
from reader import PreferenceReader | |
class SimilarityScoring(PreferenceReader): | |
""" | |
Extends the preference reader class to give our data structure the | |
ability to compute similarity scores between two critics and to give | |
the functionality to output a ranked list of similar critics if given | |
a single critic as an input. | |
""" | |
def shared_rankings(self, criticA, criticB): | |
""" | |
A helper method for finding the intersection of rated titles in | |
the data for both critics, A and B. | |
Return a dictionary whose keys are the title, and whose values are | |
the tuple of preferences (A,B) for critic A and B respectively. | |
Raises KeyError if criticA or criticB isn't in the data. | |
""" | |
# Check if the two critics are in the data set. | |
if criticA not in self: | |
raise KeyError("Couldn't find critic '%s' in data" % criticA) | |
if criticB not in self: | |
raise KeyError("Couldn't find critic '%s' in data" % criticB) | |
# Find the intersection of the rated titles in the data. | |
# Iterate through the titles criticA has ranked and see if criticB | |
# has also ranked them. If so, create a tuple of the preferences, | |
# (A,B) in a shared_rankings dictionary. | |
rankings = {} | |
for title in self[criticA]: | |
if title in self[criticB]: | |
rankings[title] = (self[criticA][title], | |
self[criticB][title]) | |
return rankings | |
def _compute_rank(self, metric, critic): | |
""" | |
An internal helper method that computes the similarity of a critic | |
to all other critics and returns them in an ordered fashion ranked | |
according to the metric, which should be a method of the class. | |
Raises KeyError if critic isn't in the data. | |
""" | |
# Check if the critic is in the data set. | |
if critic not in self: | |
raise KeyError("Unknown critic, '%s'." % critic) | |
# Check to ensure the `metric` property is callable. | |
if not callable(metric): | |
raise TypeError("The 'metric' argument must be a callable.") | |
# Score the similarity against all other critics. | |
similarities = {} | |
for other_critic in self: | |
# Don't compare similarity against self! | |
if other_critic == critic: | |
continue | |
# Store the similarity score in the dictionary | |
similarities[other_critic] = metric(critic, other_critic) | |
# Return a ranked ordered list: | |
return sorted(similarities.items(), key=operator.itemgetter(1), reverse=True) | |
def euclidean_distance(self, criticA, criticB): | |
""" | |
Reports the Euclidean distance of two critics, A and B by | |
performing a J-dimensional Euclidean calculation of each of their | |
preference vectors for the intersection of books the critics have | |
rated. | |
Raises KeyError if criticA or criticB isn't in the data. | |
""" | |
# Get the intersection of the rated titles in the data. | |
preferences = self.shared_rankings(criticA, criticB) | |
# If they have no rankings in common, return 0. | |
if len(preferences) == 0: return 0 | |
# Sum the squares of the differences | |
sum_of_squares = sum([pow(a-b, 2) for a, b in preferences.values()]) | |
# Return the inverse of the distance to give a higher score to | |
# folks who are more similar (e.g. less distance) add 1 to prevent | |
# division by zero errors and normalize ranks in [0, 1] | |
return 1 / (1 + sqrt(sum_of_squares)) | |
def euclidean_rank(self, critic): | |
""" | |
Returns an ordered list of critics, ranked by similarity to the | |
critic given as input. The ordered list contains a tuple, the name | |
of the critic along with their euclidean similarity score. | |
Raises KeyError if critic isn't in the data. | |
""" | |
return self._compute_rank(self.euclidean_distance, critic) | |
def pearson_correlation(self, criticA, criticB): | |
""" | |
Returns the Pearson Correlation of two critics, A and B by | |
performing the PPMC calculation on the scatter plot of (a, b) | |
ratings on the shared set of critiqued titles. | |
Raises KeyError if criticA or criticB isn't in the data. | |
TODO: Should we return the absolute value? | |
""" | |
# Get the set of mutually rated items | |
preferences = self.shared_rankings(criticA, criticB) | |
# Store the length to save traversals of the len computation. | |
# If they have no rankings in common, return 0. | |
length = len(preferences) | |
if length == 0: return 0 | |
# Loop through the preferences of each critic once and compute the | |
# various summations that are required for our final calculation. | |
sumA = sumB = sumSquareA = sumSquareB = sumProducts = 0 | |
for a, b in preferences.values(): | |
sumA += a | |
sumB += b | |
sumSquareA += pow(a, 2) | |
sumSquareB += pow(b, 2) | |
sumProducts += a*b | |
# Calculate Pearson Score | |
numerator = (sumProducts*length) - (sumA*sumB) | |
denominator = sqrt(((sumSquareA*length) - pow(sumA, 2)) | |
* ((sumSquareB*length) - pow(sumB, 2))) | |
# Prevent division by zero. | |
if denominator == 0: return 0 | |
return abs(numerator / denominator) | |
def pearson_rank(self, critic): | |
""" | |
Returns an ordered list of critics, ranked by similarity to the | |
critic given as input. The ordered list contains a tuple, the name | |
of the critic along with their Pearson Correlation score. | |
Raises KeyError if critic isn't in the data. | |
""" | |
return self._compute_rank(self.pearson_correlation, critic) | |
if __name__ == "__main__": | |
similarity = SimilarityScoring('../data/book_ratings.tsv') | |
print "Euclidean Rank for Ben:" | |
print "\n".join(["%s:\t%0.4f" % item for item in similarity.euclidean_rank('Benjamin Bengfort')]) | |
print "Pearson Rank for Ben:" | |
print "\n".join(["%s:\t%0.4f" % item for item in similarity.pearson_rank('Benjamin Bengfort')]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# tests.similarity_tests | |
# Tests the similarity module in collabtut. | |
# | |
# Author: Benjamin Bengfort <[email protected]> | |
# Created: Fri Aug 23 09:16:09 2013 -0400 | |
# | |
# Copyright (C) 2013 Cobrain Company | |
# For license information, see LICENSE.txt | |
# | |
# ID: similarity_tests.py [] [email protected] $ | |
""" | |
Tests the similarity module in collabtut. | |
""" | |
########################################################################## | |
## Imports | |
########################################################################## | |
import os | |
import unittest | |
from math import sqrt | |
from collabtut.similarity import SimilarityScoring | |
########################################################################## | |
## Test Cases | |
########################################################################## | |
class SimilarityTest(unittest.TestCase): | |
FIXTURE_PATH = 'test_ratings_fixture.tsv' | |
def setUp(self): | |
""" | |
Create test data fixture and write to file. | |
""" | |
data = ( | |
("Lisa", "Lady in the Water", 2.5), | |
("Lisa", "Snakes on a Plane", 3.5), | |
("Lisa", "Just My Luck", 3.0), | |
("Lisa", "Superman Returns", 3.5), | |
("Lisa", "You, Me, and Dupree", 2.5), | |
("Lisa", "The Night Listener", 3.0), | |
("Gene", "Lady in the Water", 3.0), | |
("Gene", "Snakes on a Plane", 3.5), | |
("Gene", "Just My Luck", 1.5), | |
("Gene", "Superman Returns", 5.0), | |
("Gene", "The Night Listener", 3.0), | |
("Gene", "You, Me, and Dupree", 3.5), | |
("Bill", "Superman Returns", 4.0), | |
("Bill", "The Last Samurai", 4.0), | |
("Bill", "Despicable Me", 5.0), | |
("Sean", "Superman Returns", 4.0), | |
("Sean", "The Last Samurai", 4.0), | |
("Sean", "Despicable Me", 4.0), | |
) | |
with open(self.FIXTURE_PATH, 'w') as testdata: | |
for item in data: | |
testdata.write("\t".join([str(i) for i in item]) + "\n") | |
self.similarity = SimilarityScoring(self.FIXTURE_PATH) | |
def tearDown(self): | |
""" | |
Remove test data fixture. | |
""" | |
os.remove(self.FIXTURE_PATH) | |
self.similarity = None | |
def test_sqrt(self): | |
""" | |
Ensure square root computation is sane | |
""" | |
val = sqrt(pow(4.5-4,2)+pow(1-2,2)) | |
self.assertEqual(val, 1.1180339887498949) | |
def test_inversion(self): | |
""" | |
Test square root inversion property | |
""" | |
val = sqrt(pow(4.5-4,2)+pow(1-2,2)) | |
inv = 1 / (1+val) | |
self.assertEqual(inv, 0.47213595499957939) | |
def test_euclidean(self): | |
""" | |
Test the Euclidean Distance computation | |
""" | |
distance = self.similarity.euclidean_distance('Lisa', 'Gene') | |
self.assertEqual(distance, 0.29429805508554946) | |
def test_euclidean_symmetry(self): | |
""" | |
Test Euclidean symmetry | |
""" | |
self.assertEqual(self.similarity.euclidean_distance('Lisa', 'Gene'), | |
self.similarity.euclidean_distance('Gene', 'Lisa')) | |
def test_pearson(self): | |
""" | |
Test the Pearson Coefficient computation | |
""" | |
correlation = self.similarity.pearson_correlation('Lisa', 'Gene') | |
self.assertEqual(correlation, 0.39605901719066977) | |
def test_square_domain(self): | |
""" | |
Ensure no negative square root | |
""" | |
correlation = self.similarity.pearson_correlation('Sean', 'Bill') | |
self.assertGreaterEqual(correlation, 0.0) | |
self.assertLessEqual(correlation, 1.0) | |
def test_pearson_rank(self): | |
""" | |
Test the Pearson ranking | |
Expected: | |
[('Gene', 0.39605901719066977), ('Sean', 0), ('Bill', 0)] | |
""" | |
ranking = self.similarity.pearson_rank('Lisa') | |
self.assertEqual(len(ranking), 3) | |
self.assertEqual(ranking[0], ('Gene', 0.39605901719066977)) | |
self.assertEqual(ranking[1], ('Sean', 0)) | |
self.assertEqual(ranking[2], ('Bill', 0)) | |
def test_euclidean_rank(self): | |
""" | |
Test the Euclidean ranking | |
Expected: | |
[('Sean', 0.6666666666666666), ('Bill', 0.6666666666666666), ('Gene', 0.29429805508554946)] | |
""" | |
ranking = self.similarity.euclidean_rank('Lisa') | |
self.assertEqual(len(ranking), 3) | |
self.assertEqual(ranking[0], ('Sean', 0.6666666666666666)) | |
self.assertEqual(ranking[1], ('Bill', 0.6666666666666666)) | |
self.assertEqual(ranking[2], ('Gene', 0.29429805508554946)) | |
def test_exceptions(self): | |
""" | |
Ensure unknown critics are handled | |
""" | |
pearson = self.similarity.pearson_correlation | |
euclidean = self.similarity.euclidean_distance | |
shared = self.similarity.shared_rankings | |
self.assertRaises(KeyError, pearson, 'Lisa', 'Ben') | |
self.assertRaises(KeyError, pearson, 'Ben', 'Lisa') | |
self.assertRaises(KeyError, euclidean, 'Lisa', 'Ben') | |
self.assertRaises(KeyError, euclidean, 'Ben', 'Lisa') | |
self.assertRaises(KeyError, shared, 'Lisa', 'Ben') | |
self.assertRaises(KeyError, shared, 'Ben', 'Lisa') | |
def test_range(self): | |
""" | |
Ensure all metrics are in [0, 1] | |
""" | |
euclidean = self.similarity.pearson_correlation | |
pearson = self.similarity.euclidean_distance | |
for criticA in self.similarity.critics: | |
for criticB in self.similarity.critics: | |
if criticA == criticB: continue | |
# Assert 0 < distance < 1 | |
distance = euclidean(criticA, criticB) | |
self.assertGreaterEqual(distance, 0.0) | |
self.assertLessEqual(distance, 1.0) | |
# Assert 0 < correlation < 1 | |
correlation = pearson(criticA, criticB) | |
self.assertGreaterEqual(correlation, 0.0) | |
self.assertLessEqual(correlation, 1.0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment