Skip to content

Instantly share code, notes, and snippets.

@metafeather
Created July 22, 2014 11:46
Show Gist options
  • Save metafeather/7cc0646c13a545e2d137 to your computer and use it in GitHub Desktop.
Save metafeather/7cc0646c13a545e2d137 to your computer and use it in GitHub Desktop.
MapReduce in Python from JSON via stdin
#!/usr/bin/python
# MAPPER
# ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/
import sys
import simplejson as json
def main():
# loop through each line of stdin
for line in sys.stdin:
try:
# parse the incoming json
j = json.loads(line.strip())
# initialize output structure
output = dict()
# grab an identifier
output["key"] = j["data"]["key"]
# and any other useful information from input json
output["secondary-key"] = j["data"]["another-key"]
output["first-metric"] = j["data"]["metric"]
output["second-metric"] = j["data"]["metric-2"]
except Exception as e:
sys.stderr.write("unable to read log: %s" % e)
continue
try:
# generate json output
output_json = json.dumps(output)
# write the key and json to stdout
print "%s\t%s" % (output["key"], output_json)
except Exception as e:
sys.stderr.write("unable to write mapper output: %s" % e)
continue
if __name__ == "__main__":
main()
#!/usr/bin/python
# REDUCER
# ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/
import sys
import simplejson as json
def main():
# initiate output map
output = dict()
# loop through each line for stdin
for line in sys.stdin:
try:
# split line to separate key and value
key_val = line.split("\t")
key = key_val[0]
# parse the incoming json
data = json.loads(key_val[1])
# check if key already exists
if key in output:
# grab ongoing count from output and increment metrics
ongoing_count = output[key]
ongoing_count["first-metric"] += data["first-metric"]
ongoing_count["second-metric"] += data["second-metric"]
# place updated values back into the output
output[key] = ongoing_count
else:
# if this is the first time we"ve seen this key, add it to output
output[key] = data
except Exception as e:
sys.stderr.write("unable to parse reducer input: %s" % e)
continue
# once we"ve read all lines, emit output
for key, value in output:
try:
# generate json output
output_json = json.dumps(value)
except Exception as e:
sys.stderr.write("unable to create reducer json: %s" % e)
continue
# write the key and json to stdout
print "%s\t%s" % (key, output_json)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment