Created
September 17, 2012 16:27
-
-
Save shaldengeki/3738324 to your computer and use it in GitHub Desktop.
user similarities map reduce job. takes into account topics where only one user out of a pairing has posted.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mrjob.job import MRJob | |
from math import sqrt | |
try: | |
from itertools import combinations | |
except ImportError: | |
def combinations(iterable, r): | |
""" | |
Implementation of itertools combinations method. | |
Re-implemented here because of import issues | |
in Amazon Elastic MapReduce. Just easier to do this than bootstrap. | |
More info here: | |
http://docs.python.org/library/itertools.html#itertools.combinations | |
Input/Output: | |
combinations('ABCD', 2) --> AB AC AD BC BD CD | |
combinations(range(4), 3) --> 012 013 023 123 | |
""" | |
pool = tuple(iterable) | |
n = len(pool) | |
if r > n: | |
return | |
indices = range(r) | |
yield tuple(pool[i] for i in indices) | |
while True: | |
for i in reversed(range(r)): | |
if indices[i] != i + n - r: | |
break | |
else: | |
return | |
indices[i] += 1 | |
for j in range(i + 1, r): | |
indices[j] = indices[j - 1] + 1 | |
yield tuple(pool[i] for i in indices) | |
#Parameters to regularize correlation | |
PRIOR_COUNT = 10 | |
PRIOR_CORRELATION = 0 | |
#FILTERS to speed up computation and reduce noise | |
#Subclasses should probably override these, based on actual data. | |
MIN_NUM_TOPICS = 20 | |
MIN_INTERSECTION = 20 | |
class SemicolonValueProtocol(object): | |
# don't need to implement read() since we aren't using it | |
def write(self, key, values): | |
return ';'.join(str(v) for v in values) | |
class VectorSimilarities(MRJob): | |
OUTPUT_PROTOCOL = SemicolonValueProtocol | |
def steps(self): | |
return [ | |
self.mr(mapper=self.input, reducer=self.group_by_topic_postCount), | |
self.mr(reducer=self.count_postCounts_users_freq), | |
self.mr(mapper=self.pairwise_items, | |
reducer=self.calculate_similarity)] | |
def configure_options(self): | |
super(VectorSimilarities, self).configure_options() | |
self.add_passthrough_option( | |
'--priorcount', dest='prior_count', default=10, type='int', | |
help='PRIOR_COUNT: Parameter to regularize correlation') | |
self.add_passthrough_option( | |
'--priorcorrelation', dest='prior_correlation', default=0, | |
type='int', | |
help='PRIOR_CORRELATION: Parameter to regularize correlation') | |
self.add_passthrough_option( | |
'--mintopics', dest='min_num_topics', default=20, type='int', | |
help='the minimum number of topics') | |
self.add_passthrough_option( | |
'--minintersec', dest='min_intersection', default=20, type='int', | |
help='the minimum intersection') | |
def input(self, key, line): | |
''' | |
Subclasses should override this to define their own input | |
''' | |
raise NotImplementedError('Implement this in the subclass') | |
def group_by_topic_postCount(self, key, values): | |
""" | |
Emit the topic_id and group by their postCounts (user and postCount) | |
17 70,3 | |
35 21,1 | |
49 19,2 | |
49 21,1 | |
49 70,4 | |
87 19,1 | |
87 21,2 | |
98 19,2 | |
""" | |
total_topics = 0 | |
final = [] | |
for topic_id, postCount in values: | |
total_topics += 1 | |
final.append((topic_id, postCount)) | |
if total_topics >= self.options.min_num_topics: | |
for topic_id, postCount in final: | |
yield topic_id, (key, float(postCount), total_topics) | |
def count_postCounts_users_freq(self, topic_id, values): | |
""" | |
For each topic, emit a row containing their "postings" | |
(user,postCount pairs) | |
Also emit topic postCount sum and count for use later steps. | |
17 1,3,(70,3) | |
35 1,1,(21,1) | |
49 3,7,(19,2 21,1 70,4) | |
87 2,3,(19,1 21,2) | |
98 1,2,(19,2) | |
""" | |
user_count = 0 | |
user_sum = 0 | |
final = [] | |
for user_id, postCount, total_topics in values: | |
user_count += 1 | |
user_sum += postCount | |
final.append((user_id, postCount, total_topics)) | |
yield topic_id, (user_count, user_sum, final) | |
def pairwise_items(self, topic_id, values): | |
''' | |
The output drops the topic from the key entirely, instead it emits | |
the pair of users as the key: | |
userID1,userID2 postCount1,postCount2,totalTopics1,totalTopics2 | |
19,21 2,1 | |
19,70 2,4 | |
21,70 1,4 | |
19,21 1,2 | |
This mapper is the main performance bottleneck. One improvement | |
would be to create a java Combiner to aggregate the | |
outputs by key before writing to hdfs, another would be to use | |
a vector format and SequenceFiles instead of streaming text | |
for the matrix data. | |
''' | |
user_count, user_sum, postCounts = values | |
# populate zero values for each user not already represented in postCounts. | |
present_ids = dict([(entry[0], entry[2]) for entry in postCounts]) | |
for user_id in self.user_ids: | |
if user_id not in present_ids: | |
postCounts.append((user_id, 0, self.user_ids[user_id])) | |
#print user_count, user_sum, [r for r in combinations(postCounts, 2)] | |
#bottleneck at combinations | |
for item1, item2 in combinations(postCounts, 2): | |
# filter out topic entries where both users have not posted. | |
if item1[1] != 0 or item2[1] != 0: | |
if item1[0] < item2[0]: | |
yield (item1[0], item2[0]), \ | |
(item1[1], item2[1], item1[2], item2[2]) | |
elif item1[0] > item2[0]: | |
yield (item2[0], item1[0]), \ | |
(item2[1], item1[1], item2[2], item1[2]) | |
def calculate_similarity(self, pair_key, lines): | |
''' | |
Sum components of each copostCount pair across all users who rated both | |
item x and item y, then calculate pairwise pearson similarity and | |
copostCount counts. The similarities are normalized to the [0,1] scale | |
because we do a numerical sort. | |
19,21 0.4,2 | |
21,19 0.4,2 | |
19,70 0.6,1 | |
70,19 0.6,1 | |
21,70 0.1,1 | |
70,21 0.1,1 | |
''' | |
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n, intersection = (0.0, 0.0, 0.0, 0.0, 0.0, 0, 0) | |
n_x, n_y = 0, 0 | |
user_pair, co_postCounts = pair_key, lines | |
user_xname, user_yname = user_pair | |
for user_x, user_y, nx_count, ny_count in lines: | |
sum_xx += user_x * user_x | |
sum_yy += user_y * user_y | |
sum_xy += user_x * user_y | |
sum_y += user_y | |
sum_x += user_x | |
n += 1 | |
if user_x != 0 and user_y != 0: | |
intersection += 1 | |
n_x = int(nx_count) | |
n_y = int(ny_count) | |
if intersection >= self.options.min_intersection: | |
corr_sim = correlation(n, sum_xy, sum_x, \ | |
sum_y, sum_xx, sum_yy) | |
reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, sum_y, sum_xx, sum_yy, self.options.prior_count, self.options.prior_correlation) | |
cos_sim = cosine(sum_xy, sqrt(sum_xx), sqrt(sum_yy)) | |
jaccard_sim = jaccard(intersection, n_x, n_y) | |
yield (user_xname, user_yname), (corr_sim, cos_sim, reg_corr_sim, jaccard_sim, intersection) | |
def correlation(size, dot_product, postCount_sum, \ | |
postCount2sum, postCount_norm_squared, postCount2_norm_squared): | |
''' | |
The correlation between two vectors A, B is | |
[n * dotProduct(A, B) - sum(A) * sum(B)] / | |
sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] } | |
''' | |
numerator = size * dot_product - postCount_sum * postCount2sum | |
denominator = sqrt(size * postCount_norm_squared - postCount_sum * postCount_sum) * \ | |
sqrt(size * postCount2_norm_squared - postCount2sum * postCount2sum) | |
return (numerator / (float(denominator))) if denominator else 0.0 | |
def jaccard(users_in_common, total_users1, total_users2): | |
''' | |
The Jaccard Similarity between 2 two vectors | |
|Intersection(A, B)| / |Union(A, B)| | |
''' | |
union = total_users1 + total_users2 - users_in_common | |
return (users_in_common / (float(union))) if union else 0.0 | |
def normalized_correlation(size, dot_product, postCount_sum, \ | |
postCount2sum, postCount_norm_squared, postCount2_norm_squared): | |
''' | |
The correlation between two vectors A, B is | |
cov(A, B) / (stdDev(A) * stdDev(B)) | |
The normalization is to give the scale between [0,1]. | |
''' | |
similarity = correlation(size, dot_product, postCount_sum, \ | |
postCount2sum, postCount_norm_squared, postCount2_norm_squared) | |
return (similarity + 1.0) / 2.0 | |
def cosine(dot_product, postCount_norm_squared, postCount2_norm_squared): | |
''' | |
The cosine between two vectors A, B | |
dotProduct(A, B) / (norm(A) * norm(B)) | |
''' | |
numerator = dot_product | |
denominator = postCount_norm_squared * postCount2_norm_squared | |
return (numerator / (float(denominator))) if denominator else 0.0 | |
def regularized_correlation(size, dot_product, postCount_sum, \ | |
postCount2sum, postCount_norm_squared, postCount2_norm_squared, | |
virtual_cont, prior_correlation): | |
''' | |
The Regularized Correlation between two vectors A, B | |
RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation | |
where w = # actualPairs / (# actualPairs + # virtualPairs). | |
''' | |
unregularizedCorrelation = correlation(size, dot_product, postCount_sum, postCount2sum, postCount_norm_squared, postCount2_norm_squared) | |
w = size / float(size + virtual_cont) | |
return w * unregularizedCorrelation + (1.0 - w) * prior_correlation | |
class TabValueProtocol(object): | |
# don't need to implement read() since we aren't using it | |
def write(self, key, values): | |
if key is not None: | |
return '\t'.join(str(k) for k in key) + '\t' + '\t'.join(str(v) for v in values) | |
else: | |
return "" | |
class UserSimilarities(VectorSimilarities): | |
OUTPUT_PROTOCOL = TabValueProtocol | |
def __init__(self, *args, **kwargs): | |
super(UserSimilarities, self).__init__(*args, **kwargs) | |
# make this a cli argument eventually | |
userFileContents = open('etiPostCounts-2004.tsv_users.csv', 'r').read().strip() | |
self.user_ids = dict([(int(userID.split(":")[0]), int(userID.split(":")[1])) for userID in userFileContents.split(",")]) | |
def input(self, key, line): | |
user_id, topic_id, postCount = line.split('\t') | |
yield int(user_id), (int(topic_id), int(postCount)) | |
if __name__ == '__main__': | |
UserSimilarities.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment