Skip to content

Instantly share code, notes, and snippets.

@shaldengeki
Created September 14, 2012 22:27
Show Gist options
  • Save shaldengeki/3725335 to your computer and use it in GitHub Desktop.
Save shaldengeki/3725335 to your computer and use it in GitHub Desktop.
user similarities map reduce job. this works, but disregards topics where only one user has posted.
from mrjob.job import MRJob
'''
Some constants.
'''
PRIOR_CORRELATION = 0
PRIOR_COUNT = 10
MIN_INTERSECTION = 20
MIN_TOPICS = 20
def fill_postcounts(v1, v2):
'''
given two dicts of postcounts, return filled dicts where the zeroes from one are
'''
topics = list(set(v1.keys()) | set(v2.keys()))
for topic in topics:
if topic not in v1:
v1[topic] = 0
if topic not in v2:
v2[topic] = 0
return (v1, v2)
def dotProduct(v1, v2):
return sum(map(lambda x: v1[x]*v2[x], v1))
def normSq(v1):
return sum(map(lambda x: pow(v1[x], 2), v1))
# Put metrics here.
def correlation(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq):
numerator = size * dotProd - postCountSum * postCount2Sum
denominator = pow((size * postCountNormSq - postCountSum * postCountSum) * (size * postCount2NormSq - postCount2Sum * postCount2Sum), 0.5)
if denominator == 0:
return 0
else:
return float(numerator) / denominator
def correlation_regularized(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq, virtualCount, priorCorrelation):
unRegularized = correlation(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq)
w = float(size) / (size + virtualCount)
return w * unRegularized + (1 - w) * priorCorrelation
def jaccard(commonTopics, totalTopics1, totalTopics2):
union = totalTopics1 + totalTopics2 - commonTopics
if union == 0:
return 0
else:
return float(commonTopics) / union
def cosine(dotProd, postCountNorm, postCount2Norm):
if postCountNorm * postCount2Norm == 0:
return 0
else:
return float(dotProd) / (postCountNorm * postCount2Norm)
# add metrics to the below function.
def calculate_metrics(v1, v2):
'''
Calculates similarity metrics between two unfilled vectors.
Returns [metric1, metric2, ...]
'''
# first, store the non-zero counts for each vector.
totalTopics1 = len(v1)
totalTopics2 = len(v2)
sharedTopics = len(set(v1.keys()) & set(v2.keys()))
# next, fill both vectors.
(v1, v2) = fill_postcounts(v1, v2)
# first, calculate some vector-pair values that will be useful.
size = len(v1)
dotProd = dotProduct(v1, v2)
postCountSum = sum(v1.values())
postCount2Sum = sum(v2.values())
postCountNormSq = normSq(v1)
postCount2NormSq = normSq(v2)
# now calculate all appropriate metrics.
corr = correlation(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq)
reg_corr = correlation_regularized(size, dotProd, postCountSum, postCount2Sum, postCountNormSq, postCount2NormSq, PRIOR_COUNT, PRIOR_CORRELATION)
jaccardSim = jaccard(sharedTopics, totalTopics1, totalTopics2)
cosineSim = cosine(dotProd, pow(postCountNormSq, 0.5), pow(postCount2NormSq, 0.5))
return [corr, reg_corr, jaccardSim, cosineSim]
class MRUserSimilarities(MRJob):
def __init__(self, *args, **kwargs):
super(MRUserSimilarities, self).__init__(*args, **kwargs)
self.users = {}
def get_postcounts(self, key, line):
'''
Takes file input of the form userID\ttopicID\tpostCount
Returns (topicID, (userID, postCount))
'''
splitLine = line.split()
# if int(splitLine[0]) not in self.users:
# self.users[int(splitLine[0])] = 1
yield int(splitLine[1]), (int(splitLine[0]), int(splitLine[2]))
def assemble_topic_postcounts(self, topicID, values):
'''
Takes topicID as key and a list of (userID, postCount) tuples as values
Returns (userID1,userID2),(postCount1,postCount2)
'''
usersPosts = dict(values)
users = usersPosts.keys()
for userID1 in users:
users2 = [userID for userID in users if userID > userID1]
# need to figure out a way to pull topics where only one user has posted.
# for userID1 in self.users:
# users2 = [userID for userID in self.users if userID > userID1]
for userID2 in users2:
if userID1 not in usersPosts:
usersPosts[userID1] = 0
if userID2 not in usersPosts:
usersPosts[userID2] = 0
yield (userID1,userID2),(usersPosts[userID1],usersPosts[userID2])
def pass_counts(self, userIDPair, postCountPair):
yield userIDPair,postCountPair
def get_similarities(self, userIDPair, postCounts):
'''
Takes userID and that userID's topicID:postCount dict
Returns userID pairs with similarity metrics.
'''
postCounts = list(postCounts)
if len(postCounts) >= MIN_INTERSECTION:
postCountKeys = range(len(postCounts))
v1 = dict([(topicID, postCounts[topicID][0]) for topicID in postCountKeys])
v2 = dict([(topicID, postCounts[topicID][1]) for topicID in postCountKeys])
yield userIDPair,calculate_metrics(v1, v2)
def steps(self):
return [self.mr(mapper=self.get_postcounts,
reducer=self.assemble_topic_postcounts),
self.mr(mapper=self.pass_counts,
reducer=self.get_similarities)]
if __name__ == '__main__':
MRUserSimilarities.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment