Skip to content

Instantly share code, notes, and snippets.

@jedahan
Created December 15, 2011 09:20
Show Gist options
  • Select an option

  • Save jedahan/1480481 to your computer and use it in GitHub Desktop.

Select an option

Save jedahan/1480481 to your computer and use it in GitHub Desktop.
GAE Mapreduce
def rebuildordercount():
# mapreduce over all the orders, replacing old entities
filekey = datetime.strftime(datetime.utcnow(),'%H%m%d%Y')
pipeline = OrderCountPipeline(filekey)
pipeline.start()
return pipeline.base_path + "/status?root=" + pipeline.pipeline_id
class OrderCountPipeline(base_handler.PipelineBase):
def run(self, filekey):
logging.info("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"order_count",
__name__ + ".order_count_map",
"main.order_count_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={"entity_kind": "models.Order",},
reducer_params={"mime_type": "text/plain",},
shards=16)
yield StoreOutput("OrderCount", filekey, output)
def order_count_map(order):
day = order.date.split('/')[:-1]
logging.info("day is %s" % day)
yield (":".join(day, ''))
#TODO: use op.counters.Increment (yield op.counters.Increment(day, 1))
def order_count_reduce(day, orders):
yield (":".join(day, len(orders)))
class StoreOutput(base_handler.PipelineBase):
def run(self, mr_type, encoded_key):
logging.info("output is %s %s" % (mr_type, str(encoded_key)))
if encoded_key:
key = db.Key(encoded=encoded_key)
m = db.get(key)
yield op.db.Put(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment