Created
September 14, 2012 22:27
-
-
Save shaldengeki/3725335 to your computer and use it in GitHub Desktop.
user similarities map reduce job. this works, but disregards topics where only one user has posted.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mrjob.job import MRJob | |
''' | |
Some constants. | |
''' | |
PRIOR_CORRELATION = 0 | |
PRIOR_COUNT = 10 | |
MIN_INTERSECTION = 20 | |
MIN_TOPICS = 20 | |
def fill_postcounts(v1, v2): | |
''' | |
given two dicts of postcounts, return filled dicts where the zeroes from one are | |
''' | |
topics = list(set(v1.keys()) | set(v2.keys())) | |
for topic in topics: | |
if topic not in v1: | |
v1[topic] = 0 | |
if topic not in v2: | |
v2[topic] = 0 | |
return (v1, v2) | |
def dotProduct(v1, v2): | |
return sum(map(lambda x: v1[x]*v2[x], v1)) | |
def normSq(v1): | |
return sum(map(lambda x: pow(v1[x], 2), v1)) | |
# Put metrics here. | |
def correlation(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq): | |
numerator = size * dotProd - postCountSum * postCount2Sum | |
denominator = pow((size * postCountNormSq - postCountSum * postCountSum) * (size * postCount2NormSq - postCount2Sum * postCount2Sum), 0.5) | |
if denominator == 0: | |
return 0 | |
else: | |
return float(numerator) / denominator | |
def correlation_regularized(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq, virtualCount, priorCorrelation): | |
unRegularized = correlation(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq) | |
w = float(size) / (size + virtualCount) | |
return w * unRegularized + (1 - w) * priorCorrelation | |
def jaccard(commonTopics, totalTopics1, totalTopics2): | |
union = totalTopics1 + totalTopics2 - commonTopics | |
if union == 0: | |
return 0 | |
else: | |
return float(commonTopics) / union | |
def cosine(dotProd, postCountNorm, postCount2Norm): | |
if postCountNorm * postCount2Norm == 0: | |
return 0 | |
else: | |
return float(dotProd) / (postCountNorm * postCount2Norm) | |
# add metrics to the below function. | |
def calculate_metrics(v1, v2): | |
''' | |
Calculates similarity metrics between two unfilled vectors. | |
Returns [metric1, metric2, ...] | |
''' | |
# first, store the non-zero counts for each vector. | |
totalTopics1 = len(v1) | |
totalTopics2 = len(v2) | |
sharedTopics = len(set(v1.keys()) & set(v2.keys())) | |
# next, fill both vectors. | |
(v1, v2) = fill_postcounts(v1, v2) | |
# first, calculate some vector-pair values that will be useful. | |
size = len(v1) | |
dotProd = dotProduct(v1, v2) | |
postCountSum = sum(v1.values()) | |
postCount2Sum = sum(v2.values()) | |
postCountNormSq = normSq(v1) | |
postCount2NormSq = normSq(v2) | |
# now calculate all appropriate metrics. | |
corr = correlation(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq) | |
reg_corr = correlation_regularized(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq, PRIOR_COUNT, PRIOR_CORRELATION) | |
jaccardSim = jaccard(sharedTopics, totalTopics1, totalTopics2) | |
cosineSim = cosine(dotProd, pow(postCountNormSq, 0.5), pow(postCount2NormSq, 0.5)) | |
return [corr, reg_corr, jaccardSim, cosineSim] | |
class MRUserSimilarities(MRJob): | |
def __init__(self, *args, **kwargs): | |
super(MRUserSimilarities, self).__init__(*args, **kwargs) | |
self.users = {} | |
def get_postcounts(self, key, line): | |
''' | |
Takes file input of the form userID\ttopicID\tpostCount | |
Returns (topicID, (userID, postCount)) | |
''' | |
splitLine = line.split() | |
# if int(splitLine[0]) not in self.users: | |
# self.users[int(splitLine[0])] = 1 | |
yield int(splitLine[1]), (int(splitLine[0]), int(splitLine[2])) | |
def assemble_topic_postcounts(self, topicID, values): | |
''' | |
Takes topicID as key and a list of (userID, postCount) tuples as values | |
Returns (userID1,userID2),(postCount1,postCount2) | |
''' | |
usersPosts = dict(values) | |
users = usersPosts.keys() | |
for userID1 in users: | |
users2 = [userID for userID in users if userID > userID1] | |
# need to figure out a way to pull topics where only one user has posted. | |
# for userID1 in self.users: | |
# users2 = [userID for userID in self.users if userID > userID1] | |
for userID2 in users2: | |
if userID1 not in usersPosts: | |
usersPosts[userID1] = 0 | |
if userID2 not in usersPosts: | |
usersPosts[userID2] = 0 | |
yield (userID1,userID2),(usersPosts[userID1],usersPosts[userID2]) | |
def pass_counts(self, userIDPair, postCountPair): | |
yield userIDPair,postCountPair | |
def get_similarities(self, userIDPair, postCounts): | |
''' | |
Takes userID and that userID's topicID:postCount dict | |
Returns userID pairs with similarity metrics. | |
''' | |
postCounts = list(postCounts) | |
if len(postCounts) >= MIN_INTERSECTION: | |
postCountKeys = range(len(postCounts)) | |
v1 = dict([(topicID, postCounts[topicID][0]) for topicID in postCountKeys]) | |
v2 = dict([(topicID, postCounts[topicID][1]) for topicID in postCountKeys]) | |
yield userIDPair,calculate_metrics(v1, v2) | |
def steps(self): | |
return [self.mr(mapper=self.get_postcounts, | |
reducer=self.assemble_topic_postcounts), | |
self.mr(mapper=self.pass_counts, | |
reducer=self.get_similarities)] | |
if __name__ == '__main__': | |
MRUserSimilarities.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment