Created
July 22, 2014 11:46
-
-
Save metafeather/7cc0646c13a545e2d137 to your computer and use it in GitHub Desktop.
MapReduce in Python from JSON via stdin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# MAPPER | |
# ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/ | |
import sys | |
import simplejson as json | |
def main(): | |
# loop through each line of stdin | |
for line in sys.stdin: | |
try: | |
# parse the incoming json | |
j = json.loads(line.strip()) | |
# initialize output structure | |
output = dict() | |
# grab an identifier | |
output["key"] = j["data"]["key"] | |
# and any other useful information from input json | |
output["secondary-key"] = j["data"]["another-key"] | |
output["first-metric"] = j["data"]["metric"] | |
output["second-metric"] = j["data"]["metric-2"] | |
except Exception as e: | |
sys.stderr.write("unable to read log: %s" % e) | |
continue | |
try: | |
# generate json output | |
output_json = json.dumps(output) | |
# write the key and json to stdout | |
print "%s\t%s" % (output["key"], output_json) | |
except Exception as e: | |
sys.stderr.write("unable to write mapper output: %s" % e) | |
continue | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# REDUCER | |
# ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/ | |
import sys | |
import simplejson as json | |
def main(): | |
# initiate output map | |
output = dict() | |
# loop through each line for stdin | |
for line in sys.stdin: | |
try: | |
# split line to separate key and value | |
key_val = line.split("\t") | |
key = key_val[0] | |
# parse the incoming json | |
data = json.loads(key_val[1]) | |
# check if key already exists | |
if key in output: | |
# grab ongoing count from output and increment metrics | |
ongoing_count = output[key] | |
ongoing_count["first-metric"] += data["first-metric"] | |
ongoing_count["second-metric"] += data["second-metric"] | |
# place updated values back into the output | |
output[key] = ongoing_count | |
else: | |
# if this is the first time we"ve seen this key, add it to output | |
output[key] = data | |
except Exception as e: | |
sys.stderr.write("unable to parse reducer input: %s" % e) | |
continue | |
# once we"ve read all lines, emit output | |
for key, value in output: | |
try: | |
# generate json output | |
output_json = json.dumps(value) | |
except Exception as e: | |
sys.stderr.write("unable to create reducer json: %s" % e) | |
continue | |
# write the key and json to stdout | |
print "%s\t%s" % (key, output_json) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment