Skip to content

Instantly share code, notes, and snippets.

@ob
Created May 10, 2016 18:22
Show Gist options
  • Save ob/5958e201a5698cb09137fa54c0f5359e to your computer and use it in GitHub Desktop.
Save ob/5958e201a5698cb09137fa54c0f5359e to your computer and use it in GitHub Desktop.
#!/usr/bin/python -t
#
# Run as:
# $ python ./simulate.py --repoRoot=<dir> --maxCommits=<N> --bk
#
from __future__ import print_function
from multiprocessing import Process, Manager, Condition
import bisect, collections, errno, json, logging, math, optparse, os, pwd
import random, re, shutil, socket, subprocess, sys, time, zlib
from datetime import timedelta, datetime
import dateutil.parser
import parser as pyparser
import tempfile
try:
from subprocess import DEVNULL # py3k
except ImportError:
DEVNULL = open(os.devnull, 'wb')
def which(executable):
if not executable:
return None
path = os.environ.get('PATH', None)
if not path:
return None
for p in os.environ.get('PATH', '').split(os.pathsep):
p = os.path.join(p, executable)
if os.access(p, os.X_OK):
return p
return None
def canExec(binary):
return os.path.isfile(binary) and os.access(binary, os.X_OK)
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
def runCmd(cmd, args = []):
if cmd[-1] == '-':
logging.debug("CMD: STDIN")
start = time.time()
p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate('\n'.join(args))
if p.wait() != 0:
raise Exception("'%s'' failed:\nSTDOUT:\n%s\nSTDERR:\n%s" %
(' '.join(cmd), stdout, stderr))
elapsed = time.time() - start
return (elapsed, 1)
elif len(''.join(args)) > 16000:
logging.debug("CMD: CHUNKING")
i = 0
b = 0
elapsed = 0
while b < len(args):
a = b
chunk = 0
while b < len(args) and chunk < 16000:
chunk += len(args[b])
b += 1
logging.debug(' '.join(cmd + args[a:b]))
out = tempfile.TemporaryFile()
err = tempfile.TemporaryFile()
try:
start = time.time()
subprocess.check_call(cmd + args[a:b], stdout=out, stderr=err)
elapsed += time.time() - start
except:
out.flush()
err.flush()
out.seek(0)
err.seek(0)
strout = out.read()
strerr = err.read()
print ("'%s' failed:\nSTDOUT:\n%s\nSTDERR:\n%s" %
(' '.join(cmd), strout, strerr))
raise
finally:
out.close()
err.close()
i += 1
return (elapsed, i)
else:
logging.debug("CMD: ONE")
logging.debug(' '.join(cmd + args))
out = tempfile.TemporaryFile()
err = tempfile.TemporaryFile()
try:
start = time.time()
subprocess.check_call(cmd + args, stdout=out, stderr=err)
elapsed = time.time() - start
except:
out.flush()
err.flush()
out.seek(0)
err.seek(0)
strout = out.read()
strerr = err.read()
print ("'%s' failed:\nSTDOUT:\n%s\nSTDERR:\n%s" %
(' '.join(cmd), strout, strerr))
raise
finally:
out.close()
err.close()
return (elapsed, 1)
assert False
def expbound(lmbd, bound):
"""Generate an exponentially distributed random"""
"""number with mean 1/lmbd and if greater than bound,"""
"""throw it away and generate another one"""
maxcnt = 1000
while True:
i = random.expovariate(lmbd)
if i < bound:
break
maxcnt -= 1
if maxcnt <= 0:
print ("Could not generate good random number in %d iterations", file=sys.stderr)
raise Exception('Could not generate good random number in %d iterations' % (maxcnt))
return i
class Simulation(object):
"""
Abstract class for SCM operations.
"""
allFiles = [] # Array containing all file names
allDirs = [] # Array containing all dir names
filemods = {} # Hash of filenames -> num mods
numCommits = 0 # Number of commits in the repo
ignoredDirs = None # What directories to ignore
vcs = None # VCS object to use
# Statistics
rand = None # random generator
base = None # base repo to clone, pull, push
repoRoot = None # root of the directory, we always CD here
name = '' # name of the scm
host = '' # hostname
hosthash = None # adler32 of hostname
subdir = None # this process's subdir
user = '' # username
fullname = '' # fullname (from gecos)
last_commit_ts = None # timestamp of last commit
last_checkin_ts = None # timestamp of last checkin
proc_num = None # process number
cv = None # Condition()
def __init__(self, opts, vcs):
"""Prepare the given directory by making sure it exists (mkdir -p)"""
seed = opts.seed
root = opts.repoRoot
rand = opts.rand
starting = opts.starting_files
self.vcs = vcs
self.name = vcs.name
self.host = socket.getfqdn().split('.')[0]
self.hosthash = zlib.adler32(self.host) & 0xffffffff
self.user = os.getlogin()
self.fullname = pwd.getpwnam(self.user).pw_gecos
self.cv = opts.cv
self.proc_num = opts.proc_num
self.num_procs = opts.num_procs
self.next_proc = opts.next_proc
self.live_procs = opts.live_procs
if self.live_procs:
self.live_procs[self.proc_num - 1] = 1
random.seed(seed)
self.rand = rand
self.repoRoot = os.path.abspath(root)
if opts.base:
if not os.path.exists(self.repoRoot):
logging.debug("Cloning %s" % (opts.base))
self.vcs.clone(opts.base, self.repoRoot)
self.base = opts.base
mkdir_p(self.repoRoot)
os.chdir(self.repoRoot)
if opts.subdir:
self.subdir = opts.subdir
mkdir_p(self.subdir)
self.file_ext = opts.new_file_ext or '.f'
if opts.proc_num:
self.file_ext = ".f%02d" % (opts.proc_num)
self.proc_num = opts.proc_num
if not self.vcs.is_repoRoot('.'):
# Make new empty repo here
if self.rand.haveCDF('interarrival'):
# I originally wanted to start the time at the epoch,
# but there seems to be some kind of bug where if you
# start at the epoch it doesn't work. Just work around
# it by starting on any random date.
self.last_commit_ts = datetime(1973, 06, 23, 10, 10, 10)
else:
self.last_commit_ts = datetime.now()
vcs.init('.', self.last_commit_ts)
self.seedRepo(starting)
(self.allDirs, self.allFiles) = self.scanRepo()
self.filemods = collections.defaultdict(lambda: 0)
self.numCommits = self.vcs.countCommits()
self.last_commit_ts = self.vcs.getLastCommitTS()
def seedRepo(self, starting):
# We need to seed it with some files
if self.subdir:
self.allDirs.append(self.subdir)
else:
self.allDirs.append('.')
self.newRandomFiles(starting, isSeed=True)
self.commit()
def scanRepo(self):
"""Scan the repository and fill in allFiles/allDirs
"""
allFiles = []
vcsFiles = self.vcs.scan()
dirs = set()
for line in vcsFiles:
line = line.rstrip()
# might be quoted (thanks git)
line = line.rstrip('"')
line = line.lstrip('"')
if self.subdir and not line.startswith(self.subdir):
continue
allFiles.append(line)
d = os.path.dirname(line)
dirs.add(d)
allDirs = list(dirs)
if not allDirs:
if self.subdir:
allDirs = [self.subdir]
else:
allDirs = ['']
# sort both lists for determinism
allFiles.sort()
allDirs.sort()
return (allDirs, allFiles)
def expPickFiles(self, numFiles, lamb = 0.5):
# Shuffle the indices and take numFiles
n = len(self.allFiles)
k = numFiles
if n < k:
k = n
candidates = range(n)
indices = []
while len(indices) < k:
x = candidates[int(expbound(lamb, len(candidates)))]
indices.append(x)
candidates.remove(x)
return [self.allFiles[i] for i in indices]
def delRandomFiles(self, numFiles = None):
"""Delete 'numFiles' random files and return {num, rm_time} where
num is the number of files deleted, and rm_time is the time it took
to remove them with the SCM's 'rm' command.
"""
ret = {'num': 0, 'rm_time': 0.0}
# No files to delete => nothing to do
if len(self.allFiles) == 0:
return ret
if not numFiles:
numFiles = self.rand.pick("deletedfiles", 1, 3)
if numFiles == 0 or numFiles > len(self.allFiles)/2:
return ret
# files = self.expPickFiles(numFiles, 0.05)
files = random.sample(self.allFiles, numFiles)
ret['num'] = len(files)
logging.debug("Deleting %d files" % (ret['num']))
logging.debug("Files to delete: " + ','.join(files))
if len(files) > 0:
if not self.last_checkin_ts:
# increment by 1 sec to avoid multiple csets in the same sec
td = timedelta(seconds = 1)
self.last_checkin_ts = self.last_commit_ts + td
ret['rm_time'] = self.vcs.remove(files, self.last_checkin_ts)
# Now remove the deleted files from the list of files
self.allFiles = [x for x in self.allFiles if x not in files]
self.allDirs = [os.path.dirname(x) for x in self.allFiles]
# Make sure we stay inside 'subdir'
if len(self.allDirs) == 0:
if self.subdir:
self.allDirs = [self.subdir]
mkdir_p(self.subdir)
else:
self.allDirs = ['.']
return ret
def modifyRandomFiles(self, numFiles = None):
"""Modify 'numFiles' random files and returns (n, t1, t2) where n is
the number of files, t1 is the time it took to checkout the
files and 't2' is the time it took to checkin the files.
"""
ret = {'num': 0, 'co_time': 0.0, 'ci_time': 0.0,
'co_chunks': 0, 'ci_chunks': 0,
'before_total_size': 0, 'after_total_size': 0}
# No files to modify => nothing to do
if len(self.allFiles) == 0:
return ret
if not numFiles:
numFiles = self.rand.pick("changedfiles", 1, 3)
if numFiles == 0:
return ret
files = self.expPickFiles(numFiles, 0.25)
ret['num'] = len(files)
co_chunks = 0
logging.debug("Modifying %d files" % (len(files)))
if self.vcs.needsCheckout():
logging.debug("CHECKING OUT: %d files" % (n))
(ret['co_time'], ret['co_chunks']) = self.vcs.checkout(files)
logging.debug("Files to modify: " + ','.join(files))
for filename in files:
# logging.debug(subprocess.check_output(['md5sum', filename]))
# md5before = subprocess.check_output(['md5sum', filename])
ret['before_total_size'] += os.path.getsize(filename)
f = open(filename, 'r')
lines = f.readlines()
f.close()
## find the number of 'deltas' we've done to this file.
ds = self.filemods[filename]
## Calculate a lambda parameter for an exponential distribution
## model based on a linear model taken from the linux
## kernel. We add 2 to ds in order to keep discontinuities
## out.
mu = -0.0221757 + 0.1782765 * (1./math.log(ds + 2.))
mu /= 3
l = 1./mu
# Now generate the random numbers from an exp distribution
rnd = random.expovariate(l)
add = int(abs(round(rnd * len(lines))))
rnd = expbound(l, 1.)
remove = int(abs(round(rnd * len(lines))))
if add == remove:
add += 1
logging.debug("MODIFYING: %d: %d +%d/-%d\t%s" %
(ds, len(lines), add, remove, filename))
for __ in xrange(remove):
if not lines:
break
del lines[random.randrange(0, len(lines))]
for __ in xrange(add):
lines.insert(random.randint(0, len(lines)),
self.rand.genCodeLine(int(expbound(.1, 80))))
f = open(filename, 'w')
f.writelines(lines)
f.close()
self.filemods[filename] += 1
# md5after = subprocess.check_output(['md5sum', filename])
# if md5before == md5after:
# print ("No diffs for file %s" % (filename), file=sys.stderr)
# print ("ADD: %d DEL: %d" % (add, remove), file=sys.stderr)
# print ("%s %s" % (md5before, md5after), file=sys.stderr)
# assert False
# logging.debug(subprocess.check_output(['md5sum', filename]))
ret['after_total_size'] += os.path.getsize(filename)
if self.vcs.needsCheckin():
if not self.last_checkin_ts:
# increment by 1 sec to avoid multiple csets in the same second
td = timedelta(seconds = 1)
self.last_checkin_ts = self.last_commit_ts + td
(ret['ci_time'], ret['ci_chunks']) = self.vcs.checkin(files, self.last_checkin_ts)
return ret
def newRandomFiles(self, numfiles = None, isSeed = False):
"""Create 'numFiles' new files and check them into the repository.
Returns (n, t), where n is the number of new files, and t is
time it takes to run the scm commands.
"""
ret = {'num': 0, 'ci_time': 0.0, 'ci_chunks': 0.0,
'new_total_size': 0}
if numfiles == None:
numfiles = self.rand.pick("newfiles", 0, 3)
if numfiles == 0:
return ret
ret['num'] = numfiles
logging.debug("Adding %d files" % (numfiles))
filesToCommit = []
for x in range(0, numfiles):
# pick a random directory, repetitions are ok
n = len(self.allDirs)
while True:
dir = self.allDirs[int(expbound(0.025, n))]
# TODO: model depth so that the files follow some kind of
# distribution.
if random.randint(0, 100) < 20:
if dir != '.':
subdir = os.path.join(dir, self.rand.getpath(1))
else:
subdir = self.rand.getpath(1)
mkdir_p(subdir)
dir = subdir
# Create the new file under 'dir'
if dir != '.':
fileName = os.path.join(dir, self.rand.getpath(1) + self.file_ext)
else:
fileName = self.rand.getpath(1) + self.file_ext
if fileName not in self.allFiles:
break
logging.debug("DUP %s - SKIPPING" % (fileName))
logging.debug("USING: %s" % (fileName))
self.allFiles.insert(0, fileName)
if dir not in self.allDirs:
bisect.insort_left(self.allDirs, dir)
logging.debug("Adding file: " + fileName)
if isSeed:
n = self.rand.pick("linesfilefirst")
else:
n = self.rand.pick("linesfileadded")
with open(fileName, 'w') as f:
while n > 0:
f.write(self.rand.genCodeLine(int(expbound(.025, 220)), newLine=True))
n -= 1
f.write('\n')
ret['new_total_size'] += os.path.getsize(fileName)
filesToCommit.append(fileName)
if len(filesToCommit) > 0:
if not self.last_checkin_ts:
# increment by 1 sec to avoid multiple csets in the same sec
td = timedelta(seconds = 1)
self.last_checkin_ts = self.last_commit_ts + td
(ret['ci_time'], ret['ci_chunks']) = self.vcs.add(filesToCommit, self.last_checkin_ts)
return ret
def commit(self):
"""Commit the changes that have been staged by either modifyRandomFiles, or
newRandomFiles
"""
message = self.rand.genLine(72, newLine = False)
if self.last_checkin_ts:
ts = self.last_checkin_ts + timedelta(seconds = 1)
else:
# increment by 1 sec to avoid multiple csets in the same sec
td = timedelta(seconds = 1)
ts = self.last_commit_ts + td
elapsed = self.vcs.commit(message, ts)
self.last_commit_ts = ts
self.last_checkin_ts = None
self.numCommits += 1
return elapsed
def sync(self):
"""Pull/push to base"""
logging.info("%02d: Waiting for Lock" % (self.proc_num))
self.cv.acquire()
logging.info("%02d: Lock Acquired" % (self.proc_num))
while self.next_proc.value != self.proc_num:
logging.info("%02d: Waiting for Condition" % (self.proc_num))
self.cv.wait()
logging.info("%02d: Condition Matched" % (self.proc_num))
(pull_time, push_time) = (0.0, 0.0)
try:
tip_before = self.vcs.getTip()
pull_time = self.vcs.pull(self.base)
tip_after = self.vcs.getTip()
deletes = []
newFiles = []
for (op, fn) in self.vcs.diffCommits(tip_before, tip_after):
if op == 'A':
if self.subdir and not fn.startswith(self.subdir):
continue
newFiles.append(fn)
d = os.path.dirname(fn)
if d not in self.allDirs:
bisect.insort_left(self.allDirs, d)
if op == 'D':
deletes.append(fn)
newFiles.sort()
self.allFiles = newFiles + self.allFiles
self.allFiles = [x for x in self.allFiles if x not in deletes]
self.allDirs = [os.path.dirname(x) for x in self.allFiles]
push_time = self.vcs.push(self.base)
finally:
# Find the next live proc
logging.info("%02d: livemap: %s" % (self.proc_num, self.live_procs))
s = self.proc_num - 1
i = (s + 1) % self.num_procs
while (i != s) and (self.live_procs[i] == 0):
i = (i + 1) % self.num_procs
self.next_proc.value = i+1
logging.info("%02d: set next_proc to %d" % (self.proc_num, self.next_proc.value))
self.cv.notify_all()
logging.info("%02d: notified all" % (self.proc_num))
self.cv.release()
logging.info("%02d: Lock Released" % (self.proc_num))
return (pull_time, push_time)
class Git(object):
binary = 'git'
options = None
name = 'git'
tip = None
git_config = None
"""
Encapsulates a Git repository.
"""
def __init__(self, opts):
"""
Prepare a new git repo or make sure we're running on an existing
one.
"""
self.options = opts
self.binary = opts.binary
self.name = 'git'
logging.debug("Using binary: %s" % (self.binary))
print("# Using binary: %s" % (self.binary))
print("# " + "\n# ".join(subprocess.check_output([self.binary, 'version']).split("\n")))
## Create a new configuration file
git_config = tempfile.NamedTemporaryFile(delete=False)
print("[user]", file=git_config)
print("\tname = \"BitKeeper Simulator\"", file=git_config)
print("\temail = \"[email protected]\"", file=git_config)
print("[mergetool]", file=git_config)
print("\tkeepBackup = false", file=git_config)
print("[mergetool.fakemerge]", file=git_config)
print("\tprompt = false", file=git_config)
print("\tcmd = %s \"$LOCAL\" \"$BASE\" \"$REMOTE\" \"$MERGED\"" % (opts.mergeprog), file=git_config)
print("\ttrustexitcode = true", file=git_config)
git_config.close()
self.git_config = git_config.name
os.environ["GIT_CONFIG"] = self.git_config
config = "Configured as: \n---\n%s\n---" % (subprocess.check_output([self.binary, 'config', '--list']))
print("# " + "\n# ".join(config.split("\n")))
print("# Environment:")
for k in filter(lambda x: x[0:3] == "GIT" or x[0:4] == "_GIT", os.environ.keys()):
print("# %s = %s" % (k, os.environ[k]))
def __del__(self):
"""
Clean up
"""
if self.git_config:
os.unlink(self.git_config)
def is_repoRoot(self, dir):
return '.git' in os.listdir(dir)
def init(self, dir, start_date):
cmd = [self.binary, 'init', '-q', dir]
runCmd(cmd)
def countCommits(self):
cmd = [self.binary, 'log', '--oneline']
cmdout = subprocess.check_output(cmd).rstrip()
n = len(cmdout.split('\n'))
return n
def getTip(self):
self.tip = subprocess.check_output([self.binary, 'rev-parse', 'HEAD'])
self.tip = self.tip.rstrip()
return self.tip
def diffCommits(self, rev1, rev2):
cmd = [self.binary, 'diff-tree', '-r', rev1, rev2]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in p.stdout:
(modeA, modeB, sha1A, sha1B, op, fn) = line.rstrip().split()
yield (op, fn)
p.wait()
def scan(self):
cmd = [self.binary, 'ls-tree', '-r', '--name-only', 'HEAD']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in p.stdout:
yield line.rstrip().decode('string_escape')
p.wait()
def needsCheckout(self):
return False
def needsCheckin(self):
return True
def checkout(self, files):
raise NotImplementedError()
def checkin(self, files, checkin_date):
cmd = [self.binary, 'add']
logging.debug(cmd + files)
return runCmd(cmd, files)
def add(self, files, add_date):
return self.checkin(files, add_date)
def remove(self, files, rm_date):
cmd = [self.binary, 'rm', '-f', '--quiet']
logging.debug(cmd)
return runCmd(cmd, files)
def commit(self, message, commit_date):
cmd = [self.binary, 'commit']
if self.options.sim_time:
cmd += ['--date', commit_date.isoformat()]
cmd += ['--no-verify', '--no-status', '-m', message]
logging.debug(cmd)
(elapsed, __) = runCmd(cmd)
return elapsed
def clone(self, from_url, to_url):
cmd = [self.binary, 'clone', '-q']
(elapsed, __) = runCmd(cmd, [from_url, to_url])
return elapsed
def pull(self, url):
pull_cmd = [self.binary, 'pull', '-q', '--no-edit', url, 'master']
mergetool_cmd = [self.binary, 'mergetool', '--tool=fakemerge', '--no-prompt']
commit_cmd = [self.binary, 'commit', '-m',
'Auto merged using: ' + self.options.mergeprog]
## We can't use runCmd here because pull fails on conflicts and we're relying on
## exceptions to do the merge
start = time.time()
try:
logging.debug("Running Pull: %s" % (' '.join(pull_cmd)))
subprocess.check_call(pull_cmd, stdout = DEVNULL, stderr = DEVNULL)
except subprocess.CalledProcessError as e:
if self.options.mergeprog:
logging.debug("Running Merge: %s" % (' '.join(mergetool_cmd)))
subprocess.check_call(mergetool_cmd, stdout = DEVNULL, stderr = DEVNULL)
logging.debug("Running Commit: %s" % (' '.join(commit_cmd)))
subprocess.check_call(commit_cmd, stdout = DEVNULL, stderr = DEVNULL)
pass
finally:
elapsed = time.time() - start
logging.debug("Pull Done")
return elapsed
def push(self, url):
cmd = [self.binary, 'push', '-q', url, 'master']
(elapsed, __) = runCmd(cmd)
return elapsed
def getLastCommitTS(self):
cmd = [self.binary, 'log', '-1', '--format=%ai', 'HEAD']
logging.debug(cmd)
cmdout = subprocess.check_output(cmd).rstrip()
return dateutil.parser.parse(cmdout)
class Mercurial(object):
binary = 'hg'
options = None
name = 'hg'
tip = None
"""
Encapsulates a Mercurial repository.
"""
def __init__(self, opts):
"""
Prepare a new mercurial repo or make sure we're running on an existing
one.
"""
self.options = opts
self.binary = opts.binary
self.name = 'hg'
logging.debug("Using binary: %s" % (self.binary))
print("# Using binary: %s" % (self.binary))
print("# " + "\n# ".join(subprocess.check_output([self.binary, 'version']).split("\n")))
def is_repoRoot(self, dir):
return '.hg' in os.listdir(dir)
def init(self, dir, start_date):
cmd = [self.binary, 'init', dir]
runCmd(cmd)
def getTip(self):
self.tip = subprocess.check_output([self.binary, 'tip', '-T', '{rev}'])
return self.tip
def countCommits(self):
cmd = [self.binary, 'log', '--template', '{rev}\\n']
cmdout =subprocess.check_output(cmd).rstrip()
n = len(cmdout.split('\n'))
return n
def scan(self):
cmd = [self.binary, 'locate']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in p.stdout:
yield line.rstrip()
p.wait()
def needsCheckout(self):
return False
def needsCheckin(self):
return False
def checkout(self, files):
raise NotImplementedError()
def checkin(self, files, checkin_date):
raise NotImplementedError()
def add(self, files, add_date):
cmd = [self.binary, 'add']
logging.debug(cmd + files)
return runCmd(cmd, files)
def remove(self, files, rm_date):
cmd = [self.binary, 'rm', '-f']
logging.debug(cmd)
return runCmd(cmd, files)
def commit(self, message, commit_date):
cmd = [self.binary, 'commit', '-u', 'simulate']
if self.options.sim_time:
cmd += ['-d', commit_date.isoformat(' ')]
cmd += ['-m', message]
logging.debug(cmd)
(elapsed, __) = runCmd(cmd)
return elapsed
def clone(self, from_url, to_url):
cmd = [self.binary, 'clone', '-q']
(elapsed, __) = runCmd(cmd, [from_url, to_url])
return elapsed
def pull(self, url):
pull_cmd = [self.binary, 'pull', '-q']
merge_cmd = [self.binary, 'merge', '-q']
commit_cmd = [self.binary, 'commit', '-q', '-m', 'Merge']
start = time.time()
runCmd(pull_cmd, [url])
rumCmd(merge_cmd)
runCmd(commit_cmd)
elapsed = time.time() - start
return elapsed
def push(self, url):
cmd = [self.binary, 'push', '-q']
(elapsed, __) = runCmd(cmd, [url])
return elapsed
def getLastCommitTS(self):
cmd = [self.binary, 'log', '--template={date|isodate}', '-rtip']
logging.debug(cmd)
cmdout = subprocess.check_output(cmd).rstrip()
return dateutil.parser.parse(cmdout)
class BitKeeper(object):
binary = 'bk'
dotbk = ''
options = None
name = 'bk'
filesForCommit = []
ciCmd = [binary]
commitCmd = [binary]
tip = None
"""
Encapsulates a BitKeeper repository
"""
def __init__(self, opts):
"""
Prepare a new bitkeeper repo or make sure we're running on an
existing one.
"""
self.options = opts
self.binary = opts.binary
self.ciCmd = [self.binary]
self.commitCmd = [self.binary]
## Clear the environment of anything BK related
for k in os.environ.keys():
if k[0:2] == "BK" or k[0:3] == "_BK":
del os.environ[k]
os.environ["BK_CONFIG"] = ';'.join([
'checkout:edit!'
])
os.environ["TMPDIR"] = "/tmp"
if self.options.no_uniq:
os.environ["_BK_NO_UNIQ"] = "1"
if self.options.dotbk:
self.dotbk = self.options.dotbk
mkdir_p(self.dotbk)
os.environ["BK_DOTBK"] = self.dotbk
runCmd([self.binary, '_eula', '-a'])
self.name = 'bk'
print("# BINARY: %s" % (self.binary))
# Determine what command line to use for adding new files
if self.options.commit_modified:
print('# Skipping checkin due to --commit-modified')
else:
self.ciCmd += ['ci', '-aq']
print('#\n# CHECKIN: %s' % ' '.join(self.ciCmd))
if self.options.trace:
self.commitCmd += ['--trace=cmd,perf', '--trace-file=/tmp/commit-%s.perf' % (os.getpid())]
self.commitCmd += ['commit', '-q']
if self.options.opt_import:
self.commitCmd += ['--import']
if self.options.commit_modified:
self.commitCmd += ['--ci']
print('# COMMIT: %s\n#' % ' '.join(self.commitCmd))
print("# " + "\n# ".join(subprocess.check_output([self.binary, 'version']).split("\n")))
config = "Configured as: \n---\n%s\n---" % (subprocess.check_output([self.binary, 'config', '-v']))
print("# " + "\n# ".join(config.split("\n")))
print("# Environment:")
for k in filter(lambda x: x[0:2] == "BK" or x[0:3] == "_BK", os.environ.keys()):
print("# %s = %s" % (k, os.environ[k]))
try:
os.unlink("/tmp/commit.perf")
except OSError as e:
if e.errno != os.errno.ENOENT:
raise
def is_repoRoot(self, dir):
if self.options.compat:
return set(['SCCS', 'BitKeeper']) & set(os.listdir(dir)) == set(['SCCS', 'BitKeeper'])
else:
return set(['.bk']) & set(os.listdir(dir)) == set(['.bk'])
def init(self, dir, start_date):
cmd = [self.binary, 'init']
if self.options.compat:
cmd.append('--compat')
cmd.append(dir)
if self.options.sim_time:
# since dealing with tz info is not easy, we're all in california
os.environ['BK_DATE_TIME_ZONE'] = start_date.strftime("%y/%m/%d %H:%M:%S-8:00")
runCmd(cmd)
if self.options.sim_time:
del os.environ['BK_DATE_TIME_ZONE']
def countCommits(self):
cmd = [self.binary, 'changes', '-and:I:']
cmdout =subprocess.check_output(cmd).rstrip()
n = len(cmdout.split('\n'))
## Remove 1.0 & 1.1 from count
n = (n - 2)
return n
def getTip(self):
self.tip = subprocess.check_output([self.binary, 'changes', '-d:MD5KEY:', '-r+'])
return self.tip
def diffCommits(self, rev1, rev2):
cmd = [self.binary, 'rset', '-Hh', "-r%s..%s" % (rev1, rev2)]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in p.stdout:
(sfile, fileA, revA, fileB, revB) = line.rstrip().split('|')
if revA == "1.0" and "BitKeeper/deleted" in fileB:
## File created and deleted in range, just ignore it
continue
if revA == "1.0":
yield ('A', fileB)
elif "BitKeeper/deleted" in fileB:
yield ('D', fileA)
else:
yield ('M', fileA)
p.wait()
def scan(self):
cmd = [self.binary, '-U']
logging.debug(cmd);
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in p.stdout:
yield line.rstrip()
p.wait()
def needsCheckout(self):
# Assumes checkout:edit
return False
def needsCheckin(self):
return True
def checkout(self, files):
cmd = [self.binary, 'edit', '-qS']
logging.debug(' '.join(cmd))
return runCmd(cmd, files)
def checkin(self, files, delta_date):
if self.options.commit_modified:
self.filesForCommit += files
return (0.0, 0)
return self.add(files, delta_date)
def add(self, files, add_date):
self.filesForCommit += files
if self.options.commit_modified:
return (0.0, 0)
cmd = list(self.ciCmd)
cmd += ['-y' + self.options.rand.genLine(72, False), '-']
logging.debug(cmd + files)
if self.options.sim_time:
os.environ['BK_DATE_TIME_ZONE'] = add_date.strftime("%y/%m/%d %H:%M:%S-8:00")
ret = runCmd(cmd, files)
if self.options.sim_time:
del os.environ['BK_DATE_TIME_ZONE']
return ret
def rm_name(self, file):
return subprocess.check_output([self.binary, 'log', '-r+', '-d:RM_NAME:', file])
def remove(self, files, rm_date):
self.filesForCommit += [self.rm_name(x) + '|+' for x in files]
cmd = [self.binary, 'rm']
logging.debug(cmd)
if self.options.sim_time:
os.environ['BK_DATE_TIME_ZONE'] = rm_date.strftime("%y/%m/%d %H:%M:%S-8:00")
ret = runCmd(cmd, files)
if self.options.sim_time:
del os.environ['BK_DATE_TIME_ZONE']
return ret
def commit(self, message, commit_date):
cmd = list(self.commitCmd)
cmd += ['-y' + message]
if self.options.commit_modified:
cmd += ['-']
# remove dups
self.filesForCommit = list(set(self.filesForCommit))
logging.debug(cmd)
if self.options.sim_time:
os.environ['BK_DATE_TIME_ZONE'] = commit_date.strftime("%y/%m/%d %H:%M:%S-8:00")
if self.options.commit_modified:
(elapsed, __) = runCmd(cmd, self.filesForCommit)
else:
(elapsed, __)= runCmd(cmd)
if self.options.sim_time:
del os.environ['BK_DATE_TIME_ZONE']
self.filesForCommit = []
# if self.options.debug:
# files = subprocess.check_output([self.binary, '-Ucxp'])
# logging.debug("FILES NOT COMMITTED: %s" % (files))
# assert(not files)
return elapsed
def clone(self, from_url, to_url):
cmd = [self.binary, 'clone', '-q']
(elapsed, __) = runCmd(cmd, [from_url, to_url])
return elapsed
def pull(self, url):
if self.options.mergeprog:
os.environ['BK_RESOLVE_MERGEPROG'] = self.options.mergeprog
cmd = [self.binary, 'pull', '-q']
(elapsed, __) = runCmd(cmd, [url])
del os.environ['BK_RESOLVE_MERGEPROG']
return elapsed
def push(self, url):
cmd = [self.binary, 'push', '-q']
(elapsed, __) = runCmd(cmd, [url])
return elapsed
def getLastCommitTS(self):
cmd = [self.binary, 'changes', '-nd:D_: :T: :TZ:', '-r+']
logging.debug(cmd)
cmdout = subprocess.check_output(cmd).rstrip()
return dateutil.parser.parse(cmdout)
class Random:
cdf = collections.defaultdict(lambda: None)
words = []
jsonFile = None
def __init__(self, cdf_file = None):
if cdf_file:
with open(cdf_file, 'r') as f:
probs = json.load(f)
def cdf(l):
if not l:
return [], []
vals, probs = zip(*sorted(l, key=lambda x: x[1], reverse=True))
t = float(sum(probs, 0))
s, cdfs = 0, []
for v in probs:
s += v
cdfs.append(s / t)
return vals, cdfs
for k in probs.keys():
if not isinstance(probs[k], collections.Iterable):
continue
self.cdf[k] = cdf(probs[k])
self.jsonFile = cdf_file
print("#\n# Using %s for random distributions\n#" % (cdf_file))
else:
print("#\n# Using uniform distribution from /usr/share/dict/words\n#")
for word in open('/usr/share/dict/words','r').readlines():
self.words.append(word.strip())
def pick(self, key = "", a = 1, b = 3):
if self.cdf[key]:
return self.cdf[key][0][bisect.bisect_left(self.cdf[key][1], random.random())]
# If we don't find the key but provided a file, don't fall back to uniform
# distributions without telling the user.
assert not self.jsonFile, "Failed to find key %s in file %s" % (key, self.jsonFile)
return random.randint(a, b)
def getpath(self, num = 4):
if self.cdf["paths"]:
return random.choice(self.cdf["paths"][0])
return self.words[random.randint(0, len(self.words)-1)]
def haveCDF(self, key):
return bool(self.cdf[key])
def genLine(self, length = 75, newLine = True):
"""Gen random string of word-like stuff"""
res=""
while len(res) < length:
if res:
res += " "
if self.cdf["comments"]:
res += self.pick("comments");
else:
res += self.words[random.randint(0, len(self.words)-1)]
res = res[0:length-1]
if newLine:
res += '\n'
return res
def genCodeLine(self, length = 75, newLine = True):
"""Gen random code line."""
res = ""
while len(res) < length:
if res:
res += " "
res += self.words[random.randint(0, len(self.words)-1)]
res = res[0:length-1]
if newLine:
res += '\n'
return res
def runSim(opts, allStart, vcs, output = sys.stdout, override = None):
"""Run a simulation, output in 'output'"""
options = optparse.Values()
for o, v in opts.__dict__.items():
setattr(options, o, v)
if override:
for o, v in override.items():
setattr(options, o, v)
# And pass it to the Simulation object
sim = Simulation(options, vcs)
print ("## Found %d commits, %d files, and %d directories in the repo" %
(sim.numCommits, len(sim.allFiles), len(sim.allDirs)), file = output)
if not options.numCommits and sim.numCommits >= options.maxCommits:
print ("# Nothing to do: Found %d commits, you want %d" %
(sim.numCommits, options.maxCommits), file = output)
return 1
# Output for each changeset, format is:
# NAME OF COLUMN, PRINTF STRING, VARIABLE TO PRINT
variables = [
['proc_num', '%s', "options.proc_num" ],
['label', '%s', "options.label" ],
['scm', '%s', "sim.name" ],
['total_commits', '%d', "sim.numCommits" ],
['total_files', '%d', "len(sim.allFiles)" ],
['total_dirs', '%d', "len(sim.allDirs)" ],
['mods_in_cset', '%d', "modh['num']" ],
['new_in_cset', '%d', "newh['num']" ],
['del_in_cset', '%d', "delh['num']" ],
['total_time', '%.3f', "elapsed" ],
['checkout_time', '%.3f', "modh['co_time']" ],
['checkin_time', '%.3f', "modh['ci_time']" ],
['new_time', '%.3f', "newh['ci_time']" ],
['commit_time', '%.3f', "commit" ],
['mod_co_chunks', '%d', "modh['co_chunks']" ],
['mod_ci_chunks', '%d', "modh['ci_chunks']" ],
['new_ci_chunks', '%d', "newh['ci_chunks']" ],
['mod_before_size', '%d', "modh['before_total_size']"],
['mod_after_size', '%d', "modh['after_total_size']" ],
['new_size', '%d', "newh['new_total_size']" ]
]
fmt_line = '"' + ','.join([x[1] for x in variables]) + '"'
fmt_line += ' % (' + ', '.join([x[2] for x in variables]) + ')'
if not options.noHeaders:
print(','.join(x[0] for x in variables), file = output)
if options.fail_if:
try:
st = pyparser.expr(options.fail_if)
if not st.isexpr():
print ("# Not an expression", file=sys.stderr)
return 1
except SyntaxError, e:
print ("# Could not parse expression: %s" % (options.fail_if), file=sys.stderr)
return 1
if options.numCommits:
commitCount = options.numCommits
else:
assert(options.maxCommits)
commitCount = options.maxCommits - sim.numCommits
if commitCount < 0:
print ("--maxCommits=%d is less than %d commits found in repo" %
(options.maxCommits), file=sys.stderr)
return 1
if options.parallel:
n = commitCount / options.parallel
if options.proc_num == 1:
n += commitCount % options.parallel
commitCount = n
total = 0.0
average = 0.0
count = 0
if options.sync:
resync = int(random.gauss(options.sync, options.sync/4))
max = 0.0
min = 100000000.0
while count < commitCount:
start = time.time()
if options.do_deletes:
delh = sim.delRandomFiles()
else:
delh = {'num': 0, 'rm_time': 0.0}
modh = sim.modifyRandomFiles()
newh = sim.newRandomFiles()
if delh['num'] + modh['num'] + newh['num'] == 0:
continue
try:
commit = sim.commit()
except:
print ("%d %d %d" % (delh['num'], modh['num'], newh['num']), file=sys.stderr)
raise
elapsed = time.time() - start
total += elapsed
average = ((average * count) + elapsed) / (count + 1)
if max < elapsed:
max = elapsed
if min > elapsed:
min = elapsed
count += 1
out_line = eval(fmt_line)
print(out_line, file = output)
logging.debug(out_line)
if options.fail_if:
try:
if eval(options.fail_if):
print ("# Stopping because condition '%s' became true" %
(options.fail_if), file=sys.stderr)
return 1
except Exception, e:
print ("Failed to eval: '%s'" % (options.fail_if), file=sys.stderr)
print (str(e), file=sys.stderr)
return 1
if options.sync:
resync -= 1
if resync == 0:
(pull_time, push_time) = sim.sync()
print("# Process %02d: %d of %d csets [%.3f, %.3f]" %
(options.proc_num, count, commitCount,
pull_time, push_time))
sys.stdout.flush()
resync = int(random.gauss(options.sync, options.sync/4))
if options.sync:
(pull_time, push_time) = sim.sync()
print("# Process %02d: %d of %d csets [%.3f, %.3f]" %
(options.proc_num, count, commitCount,
pull_time, push_time))
sys.stdout.flush()
allTotal = time.time() - allStart
print("# Total time: %.3f Ops: %.3f Overhead: %.3f" %
(allTotal, total, allTotal - total), file = output)
print("# Total time for humans: %s" % (timedelta(seconds=allTotal)), file = output)
print("# Average total time: %.3f %.3f %.3f " % (min, average, max), file = output)
output.flush()
if options.proc_num:
print("# Process %02d DONE" % (options.proc_num))
logging.info("%02d: Waiting for Lock" % (options.proc_num))
options.cv.acquire()
logging.info("%02d: Lock Acquired" % (options.proc_num))
options.live_procs[options.proc_num - 1] = 0
logging.info("%02d: livemap: %s" % (options.proc_num, options.live_procs))
logging.info("%02d: found next_proc set to %d" % (options.proc_num, options.next_proc.value))
if options.next_proc.value == options.proc_num:
# Advance to next live proc
s = options.proc_num - 1
i = (s + 1) % options.num_procs
while (i != s) and (options.live_procs[i] == 0):
i = (i + 1) % options.num_procs
options.next_proc.value = i + 1
logging.info("%02d set next_proc to %d" % (options.proc_num, options.next_proc.value))
options.cv.notify_all()
logging.info("%02d: notified all" % (options.proc_num))
options.cv.release()
def main():
allStart = time.time()
parser = optparse.OptionParser()
parser.add_option('--base', type='string', default=None)
parser.add_option('--bk', action="store_true", default=False)
parser.add_option('--binary', type='string', default='')
parser.add_option('--compat', action="store_true", default=False)
parser.add_option('--do-deletes', action="store_true", default=False)
parser.add_option('--dotbk', type='string', default=None)
parser.add_option('--trace', action="store_true", default=False)
parser.add_option('--opt_import', action="store_true", default=False)
parser.add_option('--debug', action="store_true", default=False)
parser.add_option('--parallel', type='int', default=None)
parser.add_option('--hg', action="store_true", default=False)
parser.add_option('--git', action="store_true", default=False)
parser.add_option('--label', help='', default=None)
parser.add_option('--new-file-ext', help='', default=None)
parser.add_option('--maxCommits', type="int", default=None)
parser.add_option('--numCommits', type="int", default=None)
parser.add_option('--no-uniq', action="store_true", default=False)
parser.add_option('--noHeaders', action="store_true", default=False)
parser.add_option('--repoRoot', help='', default=None)
parser.add_option('--seed', type="int", default=0)
parser.add_option('--subdir', default=None)
parser.add_option('--sync', type="int", default=None)
parser.add_option('--with-cdf', help='Set the CDF for randomness', default=None);
parser.add_option('--sim-time', action="store_true", default=False)
parser.add_option('--starting-files', type="int", default=0)
parser.add_option('--fail-if', type='string', default=None)
parser.add_option('--commit-modified', action="store_true", default=False)
(options, args) = parser.parse_args()
if not options.repoRoot and not (options.parallel and options.base):
if options.parallel and not options.base:
print ("--parallel requires --base", file=sys.stderr)
elif not options.repoRoot:
print ("--repoRoot is a required option", file=sys.stderr)
else:
print ("Logic error!", file=sys.stderr)
return 1
if options.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.ERROR)
if not (bool(options.maxCommits) ^ bool(options.numCommits)):
print ("Exactly one of --maxCommits and --numCommits", file=sys.stderr)
return 1
if options.commit_modified and not options.bk:
print ("--commit-modified is a BK only option", file=sys.stderr)
return 1
if options.sync and not options.base:
print ("--sync requires --base", file=sys.stderr)
return 1
print ("# %s" % (' '.join(sys.argv)))
# normalize to True/False
scms_in_options = map(bool, [options.bk, options.hg, options.git])
if scms_in_options.count(True) == 0:
print ("You must specify an SCM using: --bk, --hg, or --git", file=sys.stderr)
return 1
if scms_in_options.count(True) > 1:
print ("Only one of: --bk, --hg, or --git", file=sys.stderr)
return 1
if not options.binary:
options.binary = ['bk', 'hg', 'git'][scms_in_options.index(True)]
if not os.path.isabs(options.binary):
options.binary = which(options.binary)
if not options.binary or not canExec(options.binary):
print ("%s is not an executable" % (options.binary), file=sys.stderr)
return 1
if not options.label:
options.label = options.binary
# Initialize the VCS object
options.rand = Random(options.with_cdf)
options.cv = None
options.proc_num = None
options.num_procs = 0
options.next_proc = None
options.live_procs = None
options.mergeprog = None
script_path = os.path.dirname(os.path.realpath(__file__))
fakemerge = os.path.join(script_path, "fakemerge.py")
if os.path.exists(fakemerge):
options.mergeprog = fakemerge
vcs = [BitKeeper, Mercurial, Git][scms_in_options.index(True)](options)
if options.parallel:
wall_time = time.time()
processes = []
outfiles = []
manager = Manager()
cv = Condition()
next_proc = manager.Value('i', 1, lock=False)
live_procs = manager.list([0] * options.parallel)
for i in xrange(options.parallel):
out = tempfile.NamedTemporaryFile(delete=True)
outfiles.append(out)
override = {}
override['repoRoot'] = "%s-copy-%02d" % (options.base, i+1)
override['seed'] = i+1
override['proc_num'] = i+1
override['num_procs'] = options.parallel
override['next_proc'] = next_proc
override['live_procs'] = live_procs
if i > 0:
override['noHeaders'] = True
override['cv'] = cv
p = Process(target=runSim, args=(options, allStart, vcs, out, override))
processes.append(p)
p.start()
for p in processes:
p.join()
## Collate all the logs into one
out = tempfile.NamedTemporaryFile(delete=False)
for f in outfiles:
f.seek(0)
for line in f:
print (line, end='', file=out)
f.close()
wall_time = time.time() - wall_time
print("# Simulator done: Took %s, results in %s" % (timedelta(seconds=wall_time), out.name))
else:
runSim(options, allStart, vcs)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment