Created
January 3, 2011 19:58
-
-
Save ketralnis/763863 to your computer and use it in GitHub Desktop.
simple parallel sort
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import re | |
import sys | |
import tempfile | |
import multiprocessing | |
from optparse import OptionParser | |
from subprocess import Popen, PIPE, STDOUT | |
debug = False | |
def status(s, **kw): | |
if debug: | |
if kw: | |
s = s % kw | |
sys.stderr.write('%s\n' % (s,)) | |
buffer_size_re = re.compile('^([0-9.]+)([kmg]?)?b?$') | |
def parse_size_str(s): | |
# try to parse the buffer size | |
buffer_match = buffer_size_re.match(s) | |
if not buffer_match: | |
raise Exception("Can't parse %r" % options.buffer_size) | |
buffer_num_units = buffer_match.group(1) | |
buffer_units = buffer_match.group(2) | |
if buffer_units in ('b', ''): | |
buffer_size = int(buffer_num_units) | |
else: | |
if '.' in buffer_num_units: | |
buffer_size = float(buffer_num_units) | |
else: | |
buffer_size = long(buffer_num_units) | |
if buffer_units == 'k': | |
buffer_size *= 1024 | |
elif buffer_units == 'm': | |
buffer_size *= 1024*1024 | |
elif buffer_units == 'g': | |
buffer_size *= 1024*1024*1024 | |
else: | |
raise Exception("Unknown unit %r" % buffer_units) | |
if '.' in buffer_num_units: | |
buffer_size = long(buffer_size) | |
return buffer_size | |
class PendingWset(object): | |
def __init__(self, sort_binary, num_procs = 1, sort_args = []): | |
self.wsets = {} | |
self.finished_wsets = [] | |
self.num_procs = num_procs | |
self.sort_args = sort_args | |
self.sort_binary = sort_binary | |
def append(self, wset): | |
if len(self.wsets) >= self.num_procs: | |
# we've launched the max number of processes, so first | |
# check for those that have already finished to remove | |
# them | |
for w in list(self.wsets.values()): | |
ret = w.poll() | |
if ret is None: | |
pass | |
elif ret == 0: | |
self.wsets.pop(w.pid) | |
self.finished_wsets.append(w) | |
else: | |
raise Exception("Sorter completed with an error") | |
while len(self.wsets) >= self.num_procs: | |
# we're already running the max number and need to wait | |
# for one to finish before starting another. | |
status("Waiting early for sort process (%d/%d)" | |
% (len(self.wsets), self.num_procs)) | |
pid, estatus = os.wait() | |
if estatus != 0: | |
raise Exception("Sort completed with an error") | |
assert pid in self.wsets | |
completed_wset = self.wsets.pop(pid) | |
self.finished_wsets.append(completed_wset) | |
status("Beginning sort on %r of %d bytes" % (wset, wset.size)) | |
wset.begin_sort() | |
self.wsets[wset.pid] = wset | |
def dump_sorted(self): | |
for wset in self.wsets.values(): | |
status('Waiting for %r' % self) | |
wset.wait() | |
self.finished_wsets.append(wset) | |
self.wsets = {} | |
cmd = ([self.sort_binary] | |
+ self.sort_args | |
+ ['-m'] | |
+ [wset.tf.name | |
for wset | |
in self.finished_wsets]) | |
status('Starting %r' % cmd) | |
# will spit to stdout on its own | |
proc = Popen(cmd) | |
proc.wait() | |
for wset in self.finished_wsets: | |
wset.cleanup() | |
class Wset(object): | |
def __init__(self, sort_binary, sort_args = [], temp_dir = None): | |
self.tf = tempfile.NamedTemporaryFile(mode='w', dir=temp_dir) | |
cmd = [sort_binary] + sort_args | |
status("Starting %r to %r" % (cmd, self.tf.name)) | |
self.proc = Popen(cmd, stdin = PIPE, stdout=self.tf) | |
self.size = 0 | |
status('Starting %r' % self) | |
def __repr__(self): | |
return "<Wset(%d, %r)>" % (self.size, id(self)) | |
@property | |
def pid(self): | |
return self.proc.pid | |
@property | |
def returncode(self): | |
return self.proc.returncode | |
def append(self, line): | |
self.proc.stdin.write(line) | |
self.size += len(line) | |
def begin_sort(self): | |
status('Starting sort on %r' % self) | |
self.proc.stdin.close() | |
return self.proc.pid | |
def wait(self): | |
status('Waiting on %r' % self) | |
return self.proc.wait() | |
def poll(self): | |
return self.proc.poll() | |
def cleanup(self): | |
self.tf.close() | |
def main(): | |
global debug | |
# otherwise 'sort' wants to respect our locale and that's just | |
# sille | |
os.environ['LANG'] = 'C' | |
parser = OptionParser() | |
parser.add_option('-N', '--num-procs', dest='num_procs', | |
default = multiprocessing.cpu_count(), type='int') | |
parser.add_option('--debug', dest='debug', | |
action='store_true', default=False) | |
parser.add_option('--binary', dest='sort_binary', | |
default='/usr/bin/sort', | |
help="path to the 'sort' binary. Must accept arguments -T, -S, -m") | |
parser.add_option('-S', '--buffer-size', dest='buffer_size', | |
default='10m', help="the buffer size to be used by each sort process") | |
parser.add_option('-T', '--temporary-directory', dest='temp_dir', default=None) | |
parser.add_option('-n', dest='numeric', action='store_true', default=False) | |
parser.add_option('-r', dest='reverse', action='store_true', default=False) | |
parser.add_option('-k', dest='sort_key', default=[], action='append') | |
options, rest = parser.parse_args() | |
debug = options.debug | |
sort_binary = options.sort_binary | |
if rest: | |
files = [map(open, rest)] | |
else: | |
files = [sys.stdin] | |
sort_args = [] | |
buffer_size = parse_size_str(options.buffer_size) | |
# extra kilobyte for the remainder of the line that we probably | |
# read in the middle of | |
sort_args.append('-S%db' % (buffer_size+1024)) | |
if options.temp_dir: | |
sort_args.append('-T%s' % options.temp_dir) | |
if options.numeric: | |
sort_args.append('-n') | |
if options.reverse: | |
sort_args.append('-r') | |
for key in options.sort_key: | |
sort_args.append('-k ' + key) | |
full_wsets = PendingWset(sort_binary=sort_binary, | |
num_procs=options.num_procs, | |
sort_args=sort_args) | |
for f in files: | |
while True: | |
b = f.read(buffer_size) | |
if not b: | |
break | |
wset = Wset(sort_binary=sort_binary, | |
sort_args=sort_args, | |
temp_dir = options.temp_dir) | |
wset.append(b) | |
if b[-1] != '\n': | |
wset.append(f.readline()) | |
full_wsets.append(wset) | |
full_wsets.dump_sorted() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment