Forked from ngcrawford/multiprocessing_template.py
Last active
August 29, 2015 14:02
-
-
Save kcha/afdf9f114bb203cced1b to your computer and use it in GitHub Desktop.
Modified with options. Compatible with python 2.6.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
import sys | |
import fileinput | |
from optparse import OptionParser, OptionGroup | |
import multiprocessing | |
import time | |
from itertools import izip_longest, izip, repeat | |
def getoptions(): | |
usage = "usage: python %prog [options] <input>" | |
desc = "Template" | |
parser = OptionParser(usage = usage, description = desc) | |
parser.add_option('-c', type = "int", default = 1, | |
dest = "cores", metavar = "CORES", | |
help = "Number of processing cores [%default]") | |
parser.add_option('-t', '--test', action = "store_true", dest = "test", | |
default = False, | |
help = "Turn on test mode, which forces script to run with a \ | |
single core") | |
(opts, args) = parser.parse_args() | |
if len(args) < 1: | |
print >> sys.stderr, "Error: missing input file\n" | |
parser.print_help() | |
exit(-1) | |
return (opts, args) | |
def process_chunk(d, *args): | |
"""Replace this with your own function | |
that processes data one line at a | |
time""" | |
if d is None: return None | |
pass | |
def grouper(n, iterable, padvalue=None): | |
"""grouper(3, 'abcdefg', 'x') --> | |
('a','b','c'), ('d','e','f'), ('g','x','x')""" | |
return izip_longest(*[iter(iterable)]*n, fillvalue=padvalue) | |
def process_star(a_b): | |
""" | |
http://stackoverflow.com/a/5443941 | |
Workaround for supplying multiple arguments to pool.map in python 2.6 | |
""" | |
return process_chunk(*a_b) | |
if __name__ == '__main__': | |
(opts, cargs) = getoptions() | |
input_file = fileinput.input(cargs[0]) | |
# For debugging: disables multiprocessing | |
if opts.test: | |
print >> sys.stderr, "Running in test mode..." | |
for i in input_file: | |
print process_chunk(i) | |
else: | |
# Create pool (p) | |
p = multiprocessing.Pool(opts.cores) | |
# Use 'grouper' to split test data into | |
# groups you can process without using a | |
# ton of RAM. You'll probably want to | |
# increase the chunk size considerably | |
# to something like 1000 lines per core. | |
# The idea is that you replace 'test_data' | |
# with a file-handle | |
# e.g., testdata = open(file.txt,'rU') | |
#input_file = gzip.open(args[1], 'r') | |
i = 0 | |
for chunk in grouper(100, input_file): | |
tic = time.time() | |
# Use this version if additional arguments are needed to be supplied | |
#results = p.map(process_star, izip(chunk, repeat(argument))) | |
results = p.map(process_chunk, chunk) | |
toc = time.time() | |
print >> sys.stderr, "Chunk %d: %0.2f seconds" % \ | |
(i, float(toc - tic)) | |
for r in results: | |
if r is not None: | |
print r | |
i += 1 | |
input_file.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment