Skip to content

Instantly share code, notes, and snippets.

@kcha
Forked from ngcrawford/multiprocessing_template.py
Last active August 29, 2015 14:02
Show Gist options
  • Save kcha/afdf9f114bb203cced1b to your computer and use it in GitHub Desktop.
Save kcha/afdf9f114bb203cced1b to your computer and use it in GitHub Desktop.
Modified with options. Compatible with python 2.6.
#!/usr/bin/env python
# encoding: utf-8
import sys
import fileinput
from optparse import OptionParser, OptionGroup
import multiprocessing
import time
from itertools import izip_longest, izip, repeat
def getoptions():
usage = "usage: python %prog [options] <input>"
desc = "Template"
parser = OptionParser(usage = usage, description = desc)
parser.add_option('-c', type = "int", default = 1,
dest = "cores", metavar = "CORES",
help = "Number of processing cores [%default]")
parser.add_option('-t', '--test', action = "store_true", dest = "test",
default = False,
help = "Turn on test mode, which forces script to run with a \
single core")
(opts, args) = parser.parse_args()
if len(args) < 1:
print >> sys.stderr, "Error: missing input file\n"
parser.print_help()
exit(-1)
return (opts, args)
def process_chunk(d, *args):
"""Replace this with your own function
that processes data one line at a
time"""
if d is None: return None
pass
def grouper(n, iterable, padvalue=None):
"""grouper(3, 'abcdefg', 'x') -->
('a','b','c'), ('d','e','f'), ('g','x','x')"""
return izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
def process_star(a_b):
"""
http://stackoverflow.com/a/5443941
Workaround for supplying multiple arguments to pool.map in python 2.6
"""
return process_chunk(*a_b)
if __name__ == '__main__':
(opts, cargs) = getoptions()
input_file = fileinput.input(cargs[0])
# For debugging: disables multiprocessing
if opts.test:
print >> sys.stderr, "Running in test mode..."
for i in input_file:
print process_chunk(i)
else:
# Create pool (p)
p = multiprocessing.Pool(opts.cores)
# Use 'grouper' to split test data into
# groups you can process without using a
# ton of RAM. You'll probably want to
# increase the chunk size considerably
# to something like 1000 lines per core.
# The idea is that you replace 'test_data'
# with a file-handle
# e.g., testdata = open(file.txt,'rU')
#input_file = gzip.open(args[1], 'r')
i = 0
for chunk in grouper(100, input_file):
tic = time.time()
# Use this version if additional arguments are needed to be supplied
#results = p.map(process_star, izip(chunk, repeat(argument)))
results = p.map(process_chunk, chunk)
toc = time.time()
print >> sys.stderr, "Chunk %d: %0.2f seconds" % \
(i, float(toc - tic))
for r in results:
if r is not None:
print r
i += 1
input_file.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment