Skip to content

Instantly share code, notes, and snippets.

@shaldengeki
Created September 17, 2012 16:27
Show Gist options
  • Save shaldengeki/3738324 to your computer and use it in GitHub Desktop.
Save shaldengeki/3738324 to your computer and use it in GitHub Desktop.
user similarities map reduce job. takes into account topics where only one user out of a pairing has posted.
from mrjob.job import MRJob
from math import sqrt
try:
from itertools import combinations
except ImportError:
def combinations(iterable, r):
"""
Implementation of itertools combinations method.
Re-implemented here because of import issues
in Amazon Elastic MapReduce. Just easier to do this than bootstrap.
More info here:
http://docs.python.org/library/itertools.html#itertools.combinations
Input/Output:
combinations('ABCD', 2) --> AB AC AD BC BD CD
combinations(range(4), 3) --> 012 013 023 123
"""
pool = tuple(iterable)
n = len(pool)
if r > n:
return
indices = range(r)
yield tuple(pool[i] for i in indices)
while True:
for i in reversed(range(r)):
if indices[i] != i + n - r:
break
else:
return
indices[i] += 1
for j in range(i + 1, r):
indices[j] = indices[j - 1] + 1
yield tuple(pool[i] for i in indices)
#Parameters to regularize correlation
PRIOR_COUNT = 10
PRIOR_CORRELATION = 0
#FILTERS to speed up computation and reduce noise
#Subclasses should probably override these, based on actual data.
MIN_NUM_TOPICS = 20
MIN_INTERSECTION = 20
class SemicolonValueProtocol(object):
# don't need to implement read() since we aren't using it
def write(self, key, values):
return ';'.join(str(v) for v in values)
class VectorSimilarities(MRJob):
OUTPUT_PROTOCOL = SemicolonValueProtocol
def steps(self):
return [
self.mr(mapper=self.input, reducer=self.group_by_topic_postCount),
self.mr(reducer=self.count_postCounts_users_freq),
self.mr(mapper=self.pairwise_items,
reducer=self.calculate_similarity)]
def configure_options(self):
super(VectorSimilarities, self).configure_options()
self.add_passthrough_option(
'--priorcount', dest='prior_count', default=10, type='int',
help='PRIOR_COUNT: Parameter to regularize correlation')
self.add_passthrough_option(
'--priorcorrelation', dest='prior_correlation', default=0,
type='int',
help='PRIOR_CORRELATION: Parameter to regularize correlation')
self.add_passthrough_option(
'--mintopics', dest='min_num_topics', default=20, type='int',
help='the minimum number of topics')
self.add_passthrough_option(
'--minintersec', dest='min_intersection', default=20, type='int',
help='the minimum intersection')
def input(self, key, line):
'''
Subclasses should override this to define their own input
'''
raise NotImplementedError('Implement this in the subclass')
def group_by_topic_postCount(self, key, values):
"""
Emit the topic_id and group by their postCounts (user and postCount)
17 70,3
35 21,1
49 19,2
49 21,1
49 70,4
87 19,1
87 21,2
98 19,2
"""
total_topics = 0
final = []
for topic_id, postCount in values:
total_topics += 1
final.append((topic_id, postCount))
if total_topics >= self.options.min_num_topics:
for topic_id, postCount in final:
yield topic_id, (key, float(postCount), total_topics)
def count_postCounts_users_freq(self, topic_id, values):
"""
For each topic, emit a row containing their "postings"
(user,postCount pairs)
Also emit topic postCount sum and count for use later steps.
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)
"""
user_count = 0
user_sum = 0
final = []
for user_id, postCount, total_topics in values:
user_count += 1
user_sum += postCount
final.append((user_id, postCount, total_topics))
yield topic_id, (user_count, user_sum, final)
def pairwise_items(self, topic_id, values):
'''
The output drops the topic from the key entirely, instead it emits
the pair of users as the key:
userID1,userID2 postCount1,postCount2,totalTopics1,totalTopics2
19,21 2,1
19,70 2,4
21,70 1,4
19,21 1,2
This mapper is the main performance bottleneck. One improvement
would be to create a java Combiner to aggregate the
outputs by key before writing to hdfs, another would be to use
a vector format and SequenceFiles instead of streaming text
for the matrix data.
'''
user_count, user_sum, postCounts = values
# populate zero values for each user not already represented in postCounts.
present_ids = dict([(entry[0], entry[2]) for entry in postCounts])
for user_id in self.user_ids:
if user_id not in present_ids:
postCounts.append((user_id, 0, self.user_ids[user_id]))
#print user_count, user_sum, [r for r in combinations(postCounts, 2)]
#bottleneck at combinations
for item1, item2 in combinations(postCounts, 2):
# filter out topic entries where both users have not posted.
if item1[1] != 0 or item2[1] != 0:
if item1[0] < item2[0]:
yield (item1[0], item2[0]), \
(item1[1], item2[1], item1[2], item2[2])
elif item1[0] > item2[0]:
yield (item2[0], item1[0]), \
(item2[1], item1[1], item2[2], item1[2])
def calculate_similarity(self, pair_key, lines):
'''
Sum components of each copostCount pair across all users who rated both
item x and item y, then calculate pairwise pearson similarity and
copostCount counts. The similarities are normalized to the [0,1] scale
because we do a numerical sort.
19,21 0.4,2
21,19 0.4,2
19,70 0.6,1
70,19 0.6,1
21,70 0.1,1
70,21 0.1,1
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n, intersection = (0.0, 0.0, 0.0, 0.0, 0.0, 0, 0)
n_x, n_y = 0, 0
user_pair, co_postCounts = pair_key, lines
user_xname, user_yname = user_pair
for user_x, user_y, nx_count, ny_count in lines:
sum_xx += user_x * user_x
sum_yy += user_y * user_y
sum_xy += user_x * user_y
sum_y += user_y
sum_x += user_x
n += 1
if user_x != 0 and user_y != 0:
intersection += 1
n_x = int(nx_count)
n_y = int(ny_count)
if intersection >= self.options.min_intersection:
corr_sim = correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy)
reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, sum_y, sum_xx, sum_yy, self.options.prior_count, self.options.prior_correlation)
cos_sim = cosine(sum_xy, sqrt(sum_xx), sqrt(sum_yy))
jaccard_sim = jaccard(intersection, n_x, n_y)
yield (user_xname, user_yname), (corr_sim, cos_sim, reg_corr_sim, jaccard_sim, intersection)
def correlation(size, dot_product, postCount_sum, \
postCount2sum, postCount_norm_squared, postCount2_norm_squared):
'''
The correlation between two vectors A, B is
[n * dotProduct(A, B) - sum(A) * sum(B)] /
sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
'''
numerator = size * dot_product - postCount_sum * postCount2sum
denominator = sqrt(size * postCount_norm_squared - postCount_sum * postCount_sum) * \
sqrt(size * postCount2_norm_squared - postCount2sum * postCount2sum)
return (numerator / (float(denominator))) if denominator else 0.0
def jaccard(users_in_common, total_users1, total_users2):
'''
The Jaccard Similarity between 2 two vectors
|Intersection(A, B)| / |Union(A, B)|
'''
union = total_users1 + total_users2 - users_in_common
return (users_in_common / (float(union))) if union else 0.0
def normalized_correlation(size, dot_product, postCount_sum, \
postCount2sum, postCount_norm_squared, postCount2_norm_squared):
'''
The correlation between two vectors A, B is
cov(A, B) / (stdDev(A) * stdDev(B))
The normalization is to give the scale between [0,1].
'''
similarity = correlation(size, dot_product, postCount_sum, \
postCount2sum, postCount_norm_squared, postCount2_norm_squared)
return (similarity + 1.0) / 2.0
def cosine(dot_product, postCount_norm_squared, postCount2_norm_squared):
'''
The cosine between two vectors A, B
dotProduct(A, B) / (norm(A) * norm(B))
'''
numerator = dot_product
denominator = postCount_norm_squared * postCount2_norm_squared
return (numerator / (float(denominator))) if denominator else 0.0
def regularized_correlation(size, dot_product, postCount_sum, \
postCount2sum, postCount_norm_squared, postCount2_norm_squared,
virtual_cont, prior_correlation):
'''
The Regularized Correlation between two vectors A, B
RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
where w = # actualPairs / (# actualPairs + # virtualPairs).
'''
unregularizedCorrelation = correlation(size, dot_product, postCount_sum, postCount2sum, postCount_norm_squared, postCount2_norm_squared)
w = size / float(size + virtual_cont)
return w * unregularizedCorrelation + (1.0 - w) * prior_correlation
class TabValueProtocol(object):
# don't need to implement read() since we aren't using it
def write(self, key, values):
if key is not None:
return '\t'.join(str(k) for k in key) + '\t' + '\t'.join(str(v) for v in values)
else:
return ""
class UserSimilarities(VectorSimilarities):
OUTPUT_PROTOCOL = TabValueProtocol
def __init__(self, *args, **kwargs):
super(UserSimilarities, self).__init__(*args, **kwargs)
# make this a cli argument eventually
userFileContents = open('etiPostCounts-2004.tsv_users.csv', 'r').read().strip()
self.user_ids = dict([(int(userID.split(":")[0]), int(userID.split(":")[1])) for userID in userFileContents.split(",")])
def input(self, key, line):
user_id, topic_id, postCount = line.split('\t')
yield int(user_id), (int(topic_id), int(postCount))
if __name__ == '__main__':
UserSimilarities.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment