Created
March 5, 2011 04:28
-
-
Save laclefyoshi/856112 to your computer and use it in GitHub Desktop.
MapReduce with Akka/Actor and Jython
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# Copyright : (c) SAEKI Yoshiyasu | |
# License : MIT-style license | |
# <http://www.opensource.org/licenses/mit-license.php> | |
# last updated: 2011/03/04 | |
from akka.actor import Actors, UntypedActor | |
# from akka.dispatch import Futures | |
import collections | |
import string | |
class MapReduceActor(UntypedActor): | |
def onReceive(self, (func, input_data)): | |
# print (func, input_data) | |
result = func(input_data) | |
self.getContext().reply(result) | |
class MapReduce(object): | |
def __init__(self, mapFunc, reduceFunc, numWorkers=4, timeout=180 * 1000): | |
self.mapFunc = mapFunc | |
self.reduceFunc = reduceFunc | |
self.numWorkers = numWorkers | |
self.timeout = timeout | |
def __call__(self, input_data): | |
self.actors = [Actors.actorOf(MapReduceActor) | |
for i in range(self.numWorkers)] | |
map(lambda actor: actor.start(), self.actors) | |
mapped_values = self.executeMap(input_data) | |
partitioned_data = self.partition(mapped_values) | |
reduced_values = self.executeReduce(partitioned_data.items()) | |
map(lambda actor: actor.stop(), self.actors) | |
return reduced_values | |
def executeMap(self, input_data): | |
return self.executeFunc(self.mapFunc, input_data) | |
def executeReduce(self, input_data): | |
return self.executeFunc(self.reduceFunc, input_data) | |
def executeFunc(self, func, input_data): | |
futures = [ | |
self.actors[i % self.numWorkers].sendRequestReplyFuture( | |
(func, data), | |
self.timeout, self.actors[i % self.numWorkers]) | |
for i, data in enumerate(input_data)] | |
map(lambda future: future.await(), futures) | |
return [future.result().get() for future in futures] | |
def partition(self, mapped_values): | |
partitioned_data = collections.defaultdict(list) | |
for sublist in mapped_values: | |
for key, value in sublist: | |
partitioned_data[key].append(value) | |
return partitioned_data | |
def file_to_words(filename): | |
# reference to http://en.wikipedia.org/wiki/Stop_words | |
STOP_WORDS = set([ | |
"a", "about", "above", "above", "across", "after", "afterwards", | |
"again", "against", "all", "almost", "alone", "along", "already", | |
"also", "although", "always", "am", "among", "amongst", "amoungst", | |
"amount", "an", "and", "another", | |
"any", "anyhow", "anyone", "anything", "anyway", "anywhere", "are", | |
"around", "as", "at", "back", "be", "became", | |
"because", "become", "becomes", "becoming", "been", "before", | |
"beforehand", "behind", "being", "below", "beside", "besides", | |
"between", "beyond", "bill", "both", "bottom", "but", "by", "call", | |
"can", "cannot", "cant", "co", "con", "could", "couldnt", "cry", "de", | |
"describe", "detail", "do", "done", "down", "due", "during", "each", | |
"eg", "eight", "either", "eleven", "else", "elsewhere", "empty", | |
"enough", "etc", "even", "ever", "every", "everyone", "everything", | |
"everywhere", "except", "few", "fifteen", "fify", "fill", "find", | |
"fire", "first", "five", "for", "former", "formerly", "forty", | |
"found", "four", "from", "front", "full", "further", "get", "give", | |
"go", "had", "has", "hasnt", "have", "he", "hence", "her", "here", | |
"hereafter", "hereby", "herein", "hereupon", "hers", "herself", "him", | |
"himself", "his", "how", "however", "hundred", "ie", "if", "in", | |
"inc", "indeed", "interest", "into", "is", "it", "its", "itself", | |
"keep", "last", "latter", "latterly", "least", "less", "ltd", "made", | |
"many", "may", "me", "meanwhile", "might", "mill", "mine", "more", | |
"moreover", "most", "mostly", "move", "much", "must", "my", "myself", | |
"name", "namely", "neither", "never", "nevertheless", "next", "nine", | |
"no", "nobody", "none", "noone", "nor", "not", "nothing", "now", | |
"nowhere", "of", "off", "often", "on", "once", "one", "only", "onto", | |
"or", "other", "others", "otherwise", "our", "ours", "ourselves", | |
"out", "over", "own", "part", "per", "perhaps", "please", "put", | |
"rather", "re", "same", "see", "seem", "seemed", "seeming", "seems", | |
"serious", "several", "she", "should", "show", "side", "since", | |
"sincere", "six", "sixty", "so", "some", "somehow", "someone", | |
"something", "sometime", "sometimes", "somewhere", "still", "such", | |
"system", "take", "ten", "than", "that", "the", "their", "them", | |
"themselves", "then", "thence", "there", "thereafter", "thereby", | |
"therefore", "therein", "thereupon", "these", "they", "thickv", | |
"thin", "third", "this", "those", "though", "three", "through", | |
"throughout", "thru", "thus", "to", "together", "too", "top", | |
"toward", "towards", "twelve", "twenty", "two", "un", "under", | |
"until", "up", "upon", "us", "very", "via", "was", "we", "well", | |
"were", "what", "whatever", "when", "whence", "whenever", "where", | |
"whereafter", "whereas", "whereby", "wherein", "whereupon", | |
"wherever", "whether", "which", "while", "whither", "who", "whoever", | |
"whole", "whom", "whose", "why", "will", "with", "within", "without", | |
"would", "yet", "you", "your", "yours", "yourself", "yourselves", | |
"the" | |
]) | |
TR = string.maketrans(string.punctuation, ' ' * len(string.punctuation)) | |
output = [] | |
f = open(filename, 'rt') # open file with read/text mode | |
for line in f: | |
line = line.translate(TR) # Strip punctuation | |
for word in line.split(): | |
word = word.lower() | |
if word.isalpha() and word not in STOP_WORDS: | |
output.append((word, 1)) | |
return output | |
def count_words(item): | |
word, occurances = item | |
return (word, sum(occurances)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks so much for posting this. Very helpful to me right now, as I explore the world of Akka and wonder how I can do similar things but in Python. If I can do EXACTLY those things (use all of Akka), I will be truly delighted! Thanks for starting me down an interesting path.