Last active
May 28, 2024 15:10
-
-
Save jquast/3199d9c46de22721e7eb4599287c8d9f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#-u | |
# Python 2.4+ required | |
# by [email protected] | |
# This utility creates a multiplexing i/o pipe between a master process | |
# and multiple child processes. The master process reads in one input file | |
# and delegates equal chunks of those files into each child process. | |
# | |
# The child process script may be a binary program, too. so long as it | |
# reads from standard input! | |
# | |
# For instance, we might have a stat-script.sh: | |
# | |
# #!/bin/sh | |
# while read path; do | |
# stat $path | awk '/^Modify/ {print $2}' | |
# done | |
# | |
# and an input list, | |
# $ find /var > files-var.lst | |
# | |
# we could pipe that input file to the script: | |
# | |
# $ time ./stat-script.sh < files-var.lst | |
# real 0m48.168s | |
# user 0m12.963s | |
# sys 0m58.729s | |
# | |
# however, with multiproc.py, we can pipe this input file to 8 sub-processes, | |
# | |
# $ time ./multiproc.py -i files-var.lst -s ./stat-script.sh -p 8 | |
# | |
# real 0m25.403s | |
# user 0m20.319s | |
# sys 2m1.663s | |
# | |
# and finish in a much shorter time on multi-cpu environments! | |
import os, sys, subprocess, select | |
import getopt, re, shutil, shlex, time, fcntl | |
from n_cpus import n_cpus | |
# improvements over original | |
# * job folder and chunk parts automatically initialized from input file, or re-used on launch (-i) | |
# * job folder chunk parts moved after execution are recognized during runtime | |
# * number of subprocesses can be expressed in relation to number of cpu's (-z) | |
# * 1 less subprocess in the processes tree, the target (-s)cript is a directly | |
# managed child process. this also means signal replication is not necessary | |
# * job chunks sorted & striped by their integer value regardless of 0-padding (x1, x20, x100) | |
# * highly accurate ETA and lines/second with --eta | |
# * no '--shell' is necessary, binary executables may be directly used | |
# it is also fully compatible with multiproc job folders: | |
# * default prefix, and chunklength of split-command.sh is emulated | |
# * job state sub-folders and file conventions also emulated | |
# * process chunk striping (#1 gets 1, 4, 8, 12, #2 gets 2, 5, 9, 13) is emulated | |
# * multiple multiprocs on same input folder allowed (with warning) | |
def main(): | |
cpus, origin = n_cpus(2) | |
processes = (cpus -1) | |
chunk_pattern = '([a-zA-Z]+)([0-9]+)' | |
eta=False | |
p_args=[] | |
etaInterval = 1.0 | |
script=None | |
EXIT_POLL=1.0 | |
n_chunks=999 | |
prefix='x' | |
input=None | |
aggregate_output=False | |
splitonly=False | |
bufsize=1 | |
SHORT_OPTS='i:s:a:b:p:z:jeh' | |
LONG_OPTS='input= script= args= processes= bufsize= join eta interval= procsbycpu= nchunks= prefix= chunk_pattern splitonly help'.split() | |
def usage(): | |
sys.stdout.write ( | |
'\n%s -i <path> -s ./prog.exe [-p <n>]\n' % (sys.argv[0],) + \ | |
'\nif input is a file, that file is split into --nchunks(%i) equal parts\n' \ | |
'of an auto-created folder named as {input}.job-{rows}.{chunksize}, which\n' \ | |
'can be re-respecified as input for re-use.\n' % (n_chunks,) + \ | |
'\nsub-folders are created, each chunk is moved to notify its current state:\n' \ | |
' processing/ a subprocess is actively using this chunk\n' \ | |
' processed/ a subprocess has sucesfully completed this chunk\n' \ | |
' failed/ a subprocess returned non-zero or did not fully read input pipe\n' \ | |
' output/ contains a log of each subprocess chunks combined stdout & stderr\n' \ | |
'\nall arguments:\n' \ | |
' -i or --input=<filepath> input filepath or job folder (required)\n' \ | |
' -s or --script=<filepath> program to subprocess & feed input (required)\n' \ | |
' -a or --args=<string> program arguments to send to script (default:None)\n' \ | |
' -p or --processes=<int> number of sub-processes to use (default=%i)\n' % (processes,) + \ | |
' -j or --join (bool) when specified, the output of each subprocess\n' \ | |
' is written to master process stdout & stderr.\n' \ | |
' effectively joining all child process output pipes.\n' \ | |
' -b or --bufsize=<int> subprocess pipe buffer size, 1=line, 0=unbuffered,\n' \ | |
' -1=sysdef, large int needed for high thoroughput.\n' \ | |
' -e or --eta (bool) when specified, write ETA to stderr\n' \ | |
' --interval=<float> how often the eta should be updated, default=%0.1f\n' % (etaInterval,) + \ | |
' -z or --procsbycpu=<str> a limited {op}{val} equation for specifying the\n' \ | |
' number of processes in relation to number of\n' \ | |
" available cpu. fe: '-1','*2' for (n_cpu-1) &\n" \ | |
' (n_cpu * 2) respectively.\n' \ | |
' --nchunks=<int> number of files to divide input.lst into\n' \ | |
" --prefix=<str> alpha file chunk prefix marker (default='%s')\n" % (prefix,) + \ | |
" --chunk_pattern=<str> chunk part regexp (default='%s')\n" % (chunk_pattern,) + \ | |
' --splitonly (bool) split input file if specified and exit.\n') | |
try: | |
opts, args = getopt.getopt \ | |
(args=sys.argv[1:], shortopts=SHORT_OPTS, longopts=LONG_OPTS) | |
except getopt.GetoptError, err: | |
sys.stderr.write ('GetoptError, %s.\n' % (err,)) | |
sys.exit (1) | |
for o, a in opts: | |
if o in ('-i', '--input',): | |
if not a.strip() or not os.path.exists(a): | |
sys.stderr.write ('%s file or folder: %s does not exist.\n' % (o, a,)) | |
sys.exit(1) | |
input=a | |
elif o in ('-s', '--script',): | |
is_exe = lambda p: os.path.exists(p) and os.access(p, os.X_OK) | |
fpath, fname = os.path.split(a) | |
if fpath and not is_exe(a): | |
sys.stderr.write ('%s %s: path is not executable.\n' % (o, a,)) | |
sys.exit(1) | |
elif not fpath and not True in \ | |
[is_exe(os.path.join(p,a)) for p in os.environ["PATH"].split(os.pathsep)]: | |
sys.stderr.write ('%s %s: program not found in PATH, or is not executable\n' % (o, a,)) | |
sys.exit(1) | |
script=a | |
elif o in ('-a', '--args',): | |
p_args = shlex.split(a) | |
elif o in ('-b', '--bufsize',): | |
try: | |
bufsize = int(a) | |
except ValueError, r: | |
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, e,)) | |
sys.exit(1) | |
elif o in ('-p', '--processes',): | |
try: | |
processes = int(a) | |
if processes <= 0: | |
raise ValueError, 'not enough processes for execution' | |
except ValueError, e: | |
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, e,)) | |
sys.exit(1) | |
elif o in ('-z', '--procsbycpu',): | |
try: | |
op, val = a.strip()[0], a.strip()[1:] | |
if val.startswith('='): # allow C-like shorthand, '*=3' | |
val = ''.join(val.split('=')[1:]) | |
val = float(val) # throws ValueError | |
if op == '*': processes = int(cpus *val) | |
elif op == '/': processes = int(cpus /val) | |
elif op == '-': processes = int(cpus -val) | |
elif op == '+': processes = int(cpus +val) | |
else: | |
raise ValueError, "operator %s invalid, must be one of: '*/-+'" (op,) | |
if processes < 1: | |
raise ValueError, 'result less than 1 (%s)' % (processes,) | |
except ValueError, err: | |
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, err)) | |
sys.exit (1) | |
elif o in ('--nchunks',): | |
try: | |
n_chunks = int(a) | |
if n_chunks < 0: | |
raise ValueError, 'not enough input chunks' | |
except ValueError, e: | |
sys.stderr.write ('%s %s: ValueError: %s.\n' % (o, a, e,)) | |
sys.exit(1) | |
elif o in ('--prefix',): | |
prefix=a.strip() | |
if not prefix: | |
sys.stderr.write ('%s %s: ValueError: nil.\n' % (o, prefix, e,)) | |
sys.exit(1) | |
elif False in [c.isalpha() for c in prefix]: | |
sys.stderr.write ('%s %s: ValueError: prefix must be alpha.\n' % (o, prefix, e,)) | |
sys.exit(1) | |
elif o in ('--splitonly'): | |
splitonly = True | |
elif o in ('-j' or '--join'): | |
aggregate_output = True | |
elif o in ('-e' or '--eta'): | |
eta = True | |
elif o in ('--interval'): | |
etaInterval = float(a) | |
elif o in ('-h', '--help',): | |
usage () | |
sys.exit(0) | |
if not input: | |
sys.stderr.write ('[-i|--input] file or folder must be specified.\n') | |
usage() | |
sys.exit(1) | |
if not script: | |
sys.stderr.write ('[-s|--script] argument not supplied or nil.\n') | |
usage() | |
sys.exit(1) | |
try: | |
re_chunk = re.compile(chunk_pattern) | |
except: | |
sys.stderr.write ('Invalid regular expression: %s\n' % (chunk_pattern,)) | |
sys.exit(1) | |
# instead of managing a list of global job chunks and delegating the next | |
# available job to the next spawned process, each process id is pre-destined | |
# to pick a 'stripe' through the set. This is to simulate the original | |
# bash-multiproc behvaior, but not ideal when processing time varies wildly | |
# for each input line, for instance xml transforms on xml files of varying sizes. | |
# TODO: This should be depricated to --stripe ! | |
ischunk = lambda f: re_chunk.match(f) and os.path.isfile(os.path.join(folder,f)) | |
def chunkval(f): | |
match = re_chunk.match(f) | |
if match: | |
prefix, value = match.groups() | |
return int(value) | |
if os.path.isdir(input): | |
folder = input | |
orderedChunks = [file for val, file in sorted([(chunkval(f), f) for f in os.listdir(folder) if ischunk(f)])] | |
if eta: | |
sys.stderr.write ('calculating input size for eta ... ') | |
rows = numRows([os.path.join(input,chunk) for chunk in orderedChunks]) | |
sys.stderr.write ('%i \n' % (rows,)) | |
else: | |
folder, rows = createJobFolder(input, n_chunks, prefix) | |
orderedChunks = [file for val, file in sorted([(chunkval(f), f) for f in os.listdir(folder) if ischunk(f)])] | |
for chk_folder in (os.path.join(folder,'processing'), os.path.join(folder,'failed')): | |
if os.path.exists(chk_folder) and os.path.isdir(chk_folder) and os.listdir (chk_folder): | |
files = sorted(os.listdir(chk_folder)) | |
sys.stderr.write ('error: files in %s/ (%s)\n' % (chk_folder, ','.join(files),)) | |
sys.stderr.write ('\n(m)ove files to %s/ and continue, (c)ontinue, or (e)xit? [e]: ' % (folder,)) | |
opt = sys.stdin.readline().strip().lower() | |
if opt == 'm': | |
for file in files: | |
shutil.move (src=os.path.join(chk_folder,file), dst=folder) | |
elif opt == 'c': | |
None | |
else: | |
sys.exit(0) | |
sys.stderr.write ('\n') | |
stripe = lambda n: [orderedChunks[o-1] for o in range(n+1, len(orderedChunks)+1, processes)] | |
if splitonly: | |
sys.exit(0) | |
# procs is list of child processes, up to --processes | |
procs = [] | |
# Multiprocessing, each 'p' instance is a new subprocess | |
for n in range (processes): | |
p = Multiproc (n, script, p_args, bufsize) | |
try: | |
p.load (folder, [fs for fs in stripe(n) if fs]) | |
procs.append (p) | |
except IndexError: | |
pass # not enough input files for proc? oh well | |
received = lastReceived = 0 # number of lines received by child processes | |
beginTime = lastUpdate = lastUpdate = time.time() | |
# loop until all processes are complete. | |
while [True]*len(procs) != [p.completed for p in procs]: | |
if eta and time.time() - lastUpdate > etaInterval: | |
if received - lastReceived > 0: | |
# calculate lines/second since last update | |
lines_persecond = float(received -lastReceived) / (time.time() - lastUpdate) | |
else: | |
# no change! then calculate global lines/second | |
lines_persecond = float(received) / (time.time() - beginTime) | |
remaining = float(rows -received) | |
outline = '[%s of %s] %0.1f lps, %s eta ' \ | |
% (received, rows, lines_persecond, asctime(remaining / lines_persecond)) | |
sys.stderr.write (outline + len(outline)*'\b') | |
lastUpdate = time.time() | |
lastReceived = received | |
# 'multiplexing': wait up to EXIT_POLL for any pipes available for r/w | |
(ready_read, ready_write, ready_x) = \ | |
select.select ([p.stdout for p in procs if not p.stdout.closed]+[p.stderr for p in procs if not p.stderr.closed], | |
[p.stdin for p in procs if not p.stdin.closed], [], EXIT_POLL) | |
# check stdout pipe for output (line-buffered) | |
for p in [p for p in procs if p.stdout in ready_read]: | |
out = None | |
rr = [p.stdout] | |
while p.stdout in rr and out != '': | |
out = p.stdout.read() | |
if out: | |
p.log (out) | |
if aggregate_output: | |
sys.stdout.write (out) | |
sys.stdout.flush () | |
else: # try again | |
# XXX debug: data waiting on stdout, but not a complete line? | |
# sys.stderr.write ('eof\n') # XXX | |
continue | |
# continue checking for more lines on stdout | |
rr, rw, rx = select.select([p.stdout],[],[],0) | |
out=None # free() | |
(ready_read, ready_write, ready_x) = \ | |
select.select ([p.stdout for p in procs if not p.stdout.closed]+[p.stderr for p in procs if not p.stderr.closed], | |
[p.stdin for p in procs if not p.stdin.closed], [], EXIT_POLL) | |
# check stderr pipe for output (unbuffered) | |
for p in procs: | |
if p.stderr in ready_read: | |
err = p.stderr.read() | |
p.err (err) | |
if aggregate_output: | |
sys.stderr.write (err) | |
err=None # free() | |
# check stdin willingless | |
for p in procs: | |
if p.stdin in ready_write: # and p.poll() != None and not p.stdin.closed: | |
# all conditions check to write to stdin of child process, | |
try: | |
p.stdin.write (p.next()) # write to child process | |
received += 1 | |
except IndexError: | |
# no lines remain on input, close its input pipe | |
p.stdin.close () | |
except IOError: | |
# if the pipe is broken (due to early termination) | |
# we must indicate that the line pushed with .next() was | |
# not received, and decrement the index | |
p.index -= 1 | |
pass | |
# check for sub-process exit | |
for (ret, p) in [(p.poll(), p) for p in procs if not p.completed and p.poll() != None]: | |
# poll for remaining output | |
(ready_read, ready_write, ready_x) = \ | |
select.select ([p.stdout, p.stderr], [], [], 0) | |
# read & record all remaining lines | |
if p.stdout in ready_read: | |
out = p.stdout.read() #'\n'.join(p.stdout.readlines()) | |
p.log (out) | |
if aggregate_output: | |
sys.stdout.write (out) | |
if not p.stdout.closed: | |
p.stdout.close () | |
out=None | |
if p.stderr in ready_read: | |
err = p.stderr.read() #'\n'.join(p.stderr.readlines()) | |
p.err (err) | |
if aggregate_output: | |
sys.stderr.write (err) | |
if not p.stderr.closed: | |
p.stderr.close () | |
err=None | |
p.close () | |
if p.index < p.length: | |
sys.stderr.write ('[%s/%s]: %s pre-maturely exausted (%i/%i)\n' \ | |
% (p.n, p.pid, p.fp_in.name, p.index, p.length)) | |
# XXX | |
received += (p.length -p.index) | |
if ret == 0: | |
ret = -1 # even with a return code of 0, fake a return status | |
# of '-1' so that the chunk is moved to failed/ | |
p.complete (ret) | |
# if any child process returns a non-zero exit status, stop all pipe activity | |
# and wait for up to timeout for user to read errors and chose to exit | |
if ret not in (0,-1): | |
sys.stderr.write ('[%i/%i]: %s returned non-zero: %i.\n' \ | |
% (p.n, p.pid, p.fp_in.name, ret)) | |
sys.stderr.write ('\n') | |
timeout = 10 | |
outline = '' | |
while timeout > -1: | |
sys.stderr.write ('\b' * len(outline)) | |
outline = '(c)ontinue or (e)xit? (%i) [c]: ' % (timeout,) | |
sys.stderr.write (outline) | |
if select.select([sys.stdin], [], [], 1) == ([sys.stdin], [], []): | |
opt = sys.stdin.readline().strip().lower() | |
if opt == 'e': | |
sys.stderr.write ('\n') | |
sys.exit(ret) | |
break | |
timeout -= 1 | |
sys.stderr.write ('\n') | |
# re-investigate available files for stripe | |
files = [fs for fs in stripe(p.n) if fs] | |
# spawn new sub-process on new file chunk | |
n_p = Multiproc(p.n, script, p_args) | |
try: | |
# load remaining files into new process | |
n_p.load (folder, files) | |
except IndexError: | |
sys.stderr.write ('[%i] complete: file stripe exausted.\n' % (p.n,)) | |
continue | |
# replace procs[] reference with new sub-process | |
procs[p.n] = n_p | |
sys.stdout.write ('All processes complete (%i processed in %i seconds, %i lines/second).\n' \ | |
% (received, time.time()-beginTime, received/(time.time()-beginTime))) | |
class Multiproc(subprocess.Popen): | |
""" The Multiproc class is derived from subprocess.Popen, with additional methods | |
for managing file part chunks and folder states. """ | |
completed=False | |
index=0 | |
length=0 | |
chunk=None | |
files=[] | |
fp_in=fp_out=None | |
folder=None | |
def __init__(self, n, script, p_args, bufsize=1): | |
self.n = n # our multiproc subprocess id, and our index in global procs[] | |
subprocess.Popen.__init__ \ | |
(self, args=[script] + p_args, bufsize=bufsize, executable=script, | |
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
close_fds=True, universal_newlines=True) | |
# prevent block on .read() for stdin even if select says ready for reading, | |
# not actually sure if this occured, this was added when it seemed rsync was | |
# blocking on stderr, but in actuality it was probobly the underlying smbfs. XXX | |
# -- probobaly occured because of .readline() ! | |
# for fd in self.stdin, self.stdout, self.stderr; do | |
for fd in [self.stdout, self.stderr]: | |
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | |
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | |
def load(self, folder, inputlist): | |
" load in input list of files, must call loadPart() before next() " | |
self.folder = folder | |
self.files = inputlist | |
self.completed = False | |
self.loadPart (0) | |
def loadPart(self, n=0): | |
""" | |
load next chunk of input list as file object fp_in, for line-buffered | |
reading by next(), open a pairing logfile named fp_out, | |
raises IndexError when all input files are exausted | |
""" | |
# loop until next part is owned, raise IndexError on empty, | |
while True: | |
self.index = self.length = 0 | |
try: | |
chunk = self.files.pop(n) | |
except IndexError, e: | |
self.stdin.close() # close process input pipe | |
raise IndexError, 'input files exausted: %s' % (e,) | |
if not os.path.exists(os.path.join(self.folder,chunk)): | |
continue | |
self.chunk = chunk | |
processing = os.path.join(self.folder, 'processing', chunk) | |
self.process() | |
self.fp_in = open(processing,'r') | |
self.length = sum([1 for line in self.fp_in.readlines()]) # seek & sum | |
self.fp_in.seek (0) # rewind | |
logfile = os.path.join(self.folder, 'output', chunk + '.out') | |
if not os.path.exists(os.path.dirname(logfile)): | |
os.makedirs(os.path.dirname(logfile)) | |
self.fp_out = open(logfile, 'w', 1) # open logfile, line-buffered | |
break | |
def next(self): | |
" return next input line, throws IndexError on empty " | |
if not self.fp_in.closed: | |
line = self.fp_in.readline() | |
if line != '': | |
self.index += 1 | |
return line | |
self.fp_in.close () # close input file | |
raise IndexError, 'input exausted' | |
def close(self): | |
" close log file output, and input if opened " | |
self.fp_out.close () | |
if not self.fp_in.closed: | |
self.fp_in.close () | |
def complete(self, ret=0): | |
" move input file to processed/ or failed/ folder with return code ret " | |
processing = os.path.join(self.folder,'processing',self.chunk) | |
if not os.path.exists (processing): | |
sys.stdout.write ('[%i/%i]: race condition, src=%s disappeared, cannot move to processed/\n' % (self.n, self.pid, processing,)) | |
if ret != 0: | |
failed = os.path.join(self.folder,'failed',self.chunk) | |
if not os.path.exists(os.path.dirname(failed)): | |
os.makedirs(os.path.dirname(failed)) | |
shutil.move (src=processing, dst=failed) | |
else: | |
processed = os.path.join(self.folder,'processed',self.chunk) | |
if not os.path.exists(os.path.dirname(processed)): | |
os.makedirs(os.path.dirname(processed)) | |
shutil.move (src=processing, dst=processed) | |
self.completed = True | |
def process(self): | |
chunkpath = os.path.join(self.folder,self.chunk) | |
if not os.path.exists (chunkpath): | |
sys.stdout.write ('[%i/%i]: %s race condition, cannot move to processed/\n' \ | |
% (self.n, self.pid, self.chunk,)) | |
else: | |
processing = os.path.join(self.folder,'processing',self.chunk) | |
if not os.path.exists(os.path.dirname(processing)): | |
os.makedirs (os.path.dirname(processing)) | |
shutil.move (src=chunkpath, dst=processing) | |
self.completed = False | |
def log(self, data): | |
if data: | |
self.fp_out.write (data) | |
self.fp_out.flush () | |
def err(self, data): | |
if data: | |
self.log (data) | |
sys.stderr.write ('[%i/%i]: %s stderr: %s\n' % (self.n, self.pid, self.fp_out.name, data.rstrip())) | |
def numRows(filepaths): | |
rows = 0 | |
for filepath in filepaths: | |
fp = open(filepath, 'r') | |
# sys.stderr.write ('determining input file length: ') | |
rows += sum([1 for line in iter(fp.readline, '')]) | |
# sys.stderr.write ('%i rows.\n' % (rows,)) | |
fp.close () | |
return rows | |
def createJobFolder (filepath, n_chunks=999, prefix='x'): | |
# determine chunksize, we target no more than, but up to n_chunks. | |
rows = numRows([filepath]) | |
chunksize = (rows / n_chunks) +1 | |
if chunksize < 1: | |
chunksize=1 | |
sys.stderr.write ('warning: altering chunk size to 1: ' \ | |
'input file smaller than (-c|--chunks): %i' % (n_chunks,)) | |
# determine job folder path for file chunks | |
folder = filepath + '.job-%i.%i' % (rows, chunksize) | |
if not os.path.isdir(folder) and not os.path.exists(folder): | |
# create job folder | |
os.makedirs(folder) | |
sys.stderr.write ('created folder: %s\n' % (folder,)) | |
# create input files, a simple gnu-split, with no gnu-split dependency :) | |
sys.stderr.write ('splitting %s into %i chunks, %i rows each\n' % (filepath, n_chunks, chunksize)) | |
fi = open(filepath, 'r') | |
# macro to find output filename for index number | |
fn_output = lambda idx: '%s%0*i' % (prefix, len(str(n_chunks)), idx) | |
n=1 | |
while fi and n <= n_chunks: | |
# create x001 -> x999 for n_chunks=999 | |
fo = open(os.path.join(folder, fn_output(n)), 'w') | |
changed=False | |
for row in range(chunksize): | |
line = fi.readline() | |
if not line: | |
fo.close() | |
if not changed: # rm unwritten file | |
os.unlink (os.path.join(folder, fn_output(n))) | |
else: | |
sys.stderr.write ('warning: %s incomplete size %i (chunksize=%i)\n' % (fn_output(n), row, chunksize,)) | |
fi.close() | |
fi=None | |
break | |
else: | |
fo.write (line) | |
changed = True | |
if not line: | |
break | |
fo.close() | |
n+=1 | |
elif os.path.isdir(folder): # exists as a folder | |
sys.stderr.write ('warning: job folder already exists: %s\n' \ | |
'continuing work. specify job folder as input to supress this warning.\n' % (folder,)) | |
else: # exists, but not as a folder | |
sys.stderr.write ('job folder conflict: %s exists and is not a folder!\n') | |
sys.exit(0) | |
return folder, rows | |
def asctime(seconds): | |
minutes, seconds = divmod(seconds, 60) | |
hours, minutes = divmod(minutes, 60) | |
days, hours = divmod(hours, 24) | |
weeks, days = divmod(days, 7) | |
years, weeks = divmod(weeks, 52) | |
if years > 0: return '%iy %iw %id' % (years, weeks, days,) | |
if weeks > 0: return '%iw %id %ih' % (weeks, days, hours,) | |
if days > 0: return '%id %ih %im' % (days, hours, minutes) | |
if hours > 0: return '%ih %im %is' % (hours, minutes, seconds) | |
if minutes > 0: return '%im %is' % (minutes, seconds,) | |
else: return '%is' % (seconds,) | |
if __name__ == '__main__': | |
main() | |
# ripped from | |
# http://stackoverflow.com/questions/1006289/how-to-find-out-the-number-of-cpus-in-python | |
def n_cpus(def_ncpus=2): | |
# def_ncpus will be returned when #cpus cannot be determined | |
try: # python 2.6+ | |
import multiprocessing | |
return multiprocessing.cpu_count(), 'python2.6-native' | |
except (ImportError, NotImplementedError): pass | |
try: # POSIX | |
res = int(os.sysconf('SC_NPROCESSORS_ONLN')) | |
if res > 0: | |
return res, 'posix SC_NPROCESSORS_ONLN' | |
except (AttributeError, ValueError): pass | |
try: # jython | |
from java.lang import Runtime | |
runtime = Runtime.getRuntime() | |
res = runtime.availableProcessors() | |
if res > 0: | |
return res, 'java runtime' | |
except ImportError: pass | |
try: # BSD | |
sysctl = subprocess.Popen(['sysctl', '-n', 'hw.ncpu'], stdout=subprocess.PIPE) | |
scStdout = sysctl.communicate()[0] | |
res = int(scStdout) | |
if res > 0: | |
return res, 'BSD sysctl' | |
except (OSError, ValueError): pass | |
try: # Linux | |
res = open('/proc/cpuinfo').read().count('processor\t:') | |
if res > 0: | |
return res, '/proc/cpuinfo' | |
except IOError: pass | |
try: # Other UNIXes (heuristic) | |
try: | |
dmesg = open('/var/run/dmesg.boot').read() | |
except IOError: | |
dmesgProcess = subprocess.Popen(['dmesg'], stdout=subprocess.PIPE) | |
dmesg = dmesgProcess.communicate()[0] | |
res = 0 | |
while '\ncpu' + str(res) + ':' in dmesg: | |
res += 1 | |
if res > 0: | |
return res | |
except OSError: pass | |
return def_ncpus, 'Cannot determine n_cpus.' | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment