Skip to content

Instantly share code, notes, and snippets.

@teepark
Created March 31, 2011 04:54
Show Gist options
  • Save teepark/895838 to your computer and use it in GitHub Desktop.
Save teepark/895838 to your computer and use it in GitHub Desktop.
A quick MapReduce implementation on greenhouse and junction
import random
try:
import cPickle as pickle
except ImportError:
import pickle
import greenhouse
import junction
def map_reduce(node, service, mapper, reducer):
return node.rpc(service, "map_reduce", 0,
(pickle.dumps(mapper), pickle.dumps(reducer)), {})[0]
class Coordinator(object):
def __init__(self, node, service, mask, value, num_reducers):
self._node = node
self._service = service
self._value = value
self._num_reducers = num_reducers
self._active = {}
node.accept_rpc(
service,
"map_reduce",
mask,
value,
self.handle_map_reduce,
schedule=True)
node.accept_publish(
service,
"result",
mask,
value,
self.handle_result,
schedule=False)
def handle_map_reduce(self, pickled_mapper, pickled_reducer):
reducer = random.randrange(self._num_reducers)
mappers = self._node.publish_receiver_count(self._service, "map", 0)
job_id = self._node.rpc(
self._service,
"setup",
reducer,
(pickled_reducer, mappers, self._value),
{})[0]
self._node.publish(
self._service,
"map",
0,
(job_id, pickled_mapper, reducer),
{})
self._active[(reducer, job_id)] = waiter = greenhouse.Event()
waiter.wait()
return self._active.pop((reducer, job_id))
def handle_result(self, reducer, job_id, results):
ev = self._active[(reducer, job_id)]
self._active[(reducer, job_id)] = results
ev.set()
class Reducer(object):
def __init__(self, node, service, mask, value):
self._node = node
self._service = service
self._value = value
self._active = {}
self._results = {}
self._setup_data = {}
self._counter = 1
node.accept_rpc(
service,
"setup",
mask,
value,
self.handle_setup,
schedule=False)
node.accept_publish(
service,
"reduce",
mask,
value,
self.handle_reduce,
schedule=True)
def handle_setup(self, pickled_reducer, mapper_count, coordinator_id):
counter = self._counter
self._counter += 1
self._setup_data[counter] = (coordinator_id, pickled_reducer)
self._active[counter] = mapper_count
return counter
def handle_reduce(self, job_id, results, final):
coordinator, pickled_reducer = self._setup_data[job_id]
args = [pickle.loads(pickled_reducer), results]
if job_id in self._results:
args.append(self._results[job_id])
self._results[job_id] = reduce(*args)
if final:
self._active[job_id] -= 1
if not self._active[job_id]:
del self._active[job_id]
del self._setup_data[job_id]
self._node.publish(
self._service,
"result",
coordinator,
(self._value, job_id, self._results.pop(job_id),),
{})
class Mapper(object):
page_size = 64
def __init__(self, node, service, data_source, reducer_count):
self._node = node
self._service = service
self._datasource = data_source
self._num_reducers = reducer_count
node.accept_publish(
service, "map", 0, 0, self.handle_map, schedule=True)
def handle_map(self, job_id, pickled_mapper, reducer_id):
items = self._datasource()
map_func = pickle.loads(pickled_mapper)
results = []
for i, item in enumerate(items):
results.append(map_func(item))
if i and not i % self.page_size:
self._node.publish(
self._service,
"reduce",
reducer_id,
(job_id, results, False),
{})
results = []
greenhouse.pause()
self._node.publish(
self._service,
"reduce",
reducer_id,
(job_id, results, True),
{})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment