Created
October 1, 2013 00:41
-
-
Save MattFaus/6772419 to your computer and use it in GitHub Desktop.
A quick experiment with the combiner_spec parameter in the appengine MapreducePipeline()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import logging | |
def map(data): | |
try: | |
# Generate a random key from 1..10 | |
key = random.randint(1, 10) | |
logging.info("%s %s", key, data) | |
yield (key, data) | |
except: | |
import traceback | |
logging.error(traceback.format_exc()) | |
def combine(key, new_values, old_values): | |
try: | |
logging.error("%s %s %s", key, new_values, old_values) | |
old_values = set(old_values) | |
for value in new_values: | |
# Remove duplicates | |
if value not in old_values: | |
old_values.add(value) | |
yield value | |
else: | |
logging.info("Removing duplicate %s", value) | |
except: | |
import traceback | |
logging.error(traceback.format_exc()) | |
def reduce(key, values): | |
try: | |
logging.info("%s %s", key, values) | |
# Emit an output line with <random_key>-<letter>: <count> | |
for v in values: | |
yield "%s - %s\n" % (key, v) | |
except: | |
import traceback | |
logging.error(traceback.format_exc()) | |
class ContentAnalyticsLaunch(ContentAnalyticsQuery): | |
@user_util.manual_access_checking # superuser only via app.yaml (/admin) | |
def get(self): | |
try: | |
from third_party import mapreduce | |
import logging | |
pipeline = mapreduce.mapreduce_pipeline.MapreducePipeline( | |
"test_combiner", | |
"content_analytics.handlers.map", | |
"content_analytics.handlers.reduce", | |
# A debug input_reader provided by the SDK for testing purposes | |
"third_party.mapreduce.input_readers.RandomStringInputReader", | |
"third_party.mapreduce.output_writers.BlobstoreOutputWriter", | |
combiner_spec="content_analytics.handlers.combine", | |
mapper_params={ | |
"string_length": 1, | |
"count": 500, | |
}, | |
reducer_params={ | |
"mime_type": "text/plain", | |
}, | |
shards=16) | |
pipeline.start() | |
logging.error(pipeline.pipeline_id) | |
except: | |
import traceback | |
logging.error(traceback.format_exc()) | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment