Created
August 24, 2018 19:26
-
-
Save kriswill/83c1aad72b961632866fc012f2578dea to your computer and use it in GitHub Desktop.
vcprompt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Usage: vcprompt [options] | |
Version control information in your prompt. | |
Attribution: possible original author <Matthias Riegler https://github.com/xvzf>? | |
Options: | |
-f, --format FORMAT The format string to use. | |
-p, --path PATH The path to run vcprompt on. | |
-d, --max-depth DEPTH The maximum number of directories to traverse. | |
-u, --unknown UNKNOWN The "unknown" value. | |
-s, --systems Print all known VCSs and exit | |
-v, --version Show program's version number and exit | |
-h, --help Show this help message and exit | |
VCS-specific formatting: | |
These options can be used for VCS-specific prompt formatting. | |
--format-bzr FORMAT Bazaar | |
--format-cvs FORMAT CVS | |
--format-darcs FORMAT Darcs | |
--format-fossil FORMAT Fossil | |
--format-git FORMAT Git | |
--format-hg FORMAT Mercurial | |
--format-svn FORMAT Subversion | |
""" | |
from subprocess import call, Popen, PIPE | |
from xml.dom.minidom import parseString | |
import optparse | |
import os | |
import re | |
import sys | |
try: | |
import sqlite3 | |
has_sqlite3 = True | |
except ImportError: | |
try: | |
from pysqlite2 import dbapi2 as sqlite3 | |
except ImportError: | |
has_sqlite3 = False | |
__version__ = (0, 1, 5) | |
# check to make sure the '--without-environment' flag is called first | |
# this could be done in a callback, but we'd have to keep a note of every | |
# option which is affected by this flag | |
if '--without-environment' in sys.argv and \ | |
sys.argv[1] != '--without-environment': | |
output = "The '--without-environment' option must come before any " | |
print >> sys.stderr, '%s other options.' % output | |
sys.exit(1) | |
# we need to get this in early because callbacks are always called after | |
# every other option is already set, regardless of when the callback option | |
# is actually used in the script | |
if len(sys.argv) > 1 and sys.argv[1] == '--without-environment': | |
for k in os.environ.keys(): | |
if k.startswith('VCPROMPT'): | |
del os.environ[k] | |
del sys.argv[1] | |
# user editable options | |
DEPTH = os.environ.get('VCPROMPT_DEPTH', 0) | |
FORMAT = os.environ.get('VCPROMPT_FORMAT', '%s:%b') | |
UNKNOWN = os.environ.get('VCPROMPT_UNKNOWN', '(unknown)') | |
SYSTEMS = [] | |
# status indicators | |
STAGED = '*' | |
MODIFIED = '+' | |
UNTRACKED = '?' | |
def helper(*args, **kwargs): | |
""" | |
Prints the module's docstring. | |
Doing this kills two birds with one stone: it adds PEP 257 | |
compliance and allows us to stop using optparse's built-in | |
help flag. | |
""" | |
print __doc__.strip() | |
sys.exit(0) | |
def systems(*args, **kwargs): | |
"""Prints all available systems to stdout.""" | |
for system in SYSTEMS: | |
print system.func_name | |
sys.exit(0) | |
def vcs(file): | |
""" | |
A convenience decorator. | |
Wraps a VCS function, appending it to ``SYSTEMS`` and sets the ``file`` | |
attribute. | |
""" | |
def wrapper(function): | |
SYSTEMS.append(function) | |
function.file = file | |
return function | |
return wrapper | |
def version(*args): | |
""" | |
Convenience function for printing a version number. | |
""" | |
print 'vcprompt %s' % '.'.join(map(str, __version__)) | |
sys.exit(0) | |
def vcprompt(options): | |
""" | |
Returns a formatted version control string for use in a shell prompt | |
or elsewhere. | |
Arguments: | |
``options`` | |
An optparse.Values instance. | |
""" | |
options.path = os.path.abspath(os.path.expanduser(options.path)) | |
prompt = None | |
count = 0 | |
while options.path: | |
# bail out on non-existant paths | |
if not os.path.exists(options.path): | |
break | |
# We need to change the current working directory or the '--path' | |
# flag might not work correctly with some formatting args. | |
# It's easier to do this here, rather than in every VCS function | |
if options.path != os.getcwd(): | |
os.chdir(options.path) | |
for vcs in SYSTEMS: | |
if not os.path.exists(vcs.file): | |
continue | |
# set up custom formatting | |
vcs_format = getattr(options, 'format-' + vcs.__name__, None) | |
if vcs_format: | |
options.format = vcs_format | |
# the "vcs" file | |
options.file = vcs.file | |
prompt = vcs(options) | |
if prompt is not None: | |
return prompt | |
if options.depth: | |
if count == options.depth: | |
break | |
count += 1 | |
options.path = options.path.rsplit('/', 1)[0] | |
return '' | |
def main(): | |
# parser | |
parser = optparse.OptionParser() | |
# dump the provided --help option | |
parser.remove_option('--help') | |
# our own --help flag | |
parser.add_option('-h', '--help', action='callback', callback=helper) | |
# format | |
parser.add_option('-f', '--format', dest='format', default=FORMAT) | |
# path | |
parser.add_option('-p', '--path', dest='path', default='.') | |
# max depth | |
parser.add_option('-d', '--max-depth', dest='depth', type='int', | |
default=DEPTH) | |
# systems | |
parser.add_option('-s', '--systems', action='callback', callback=systems) | |
# unknown | |
parser.add_option('-u', '--unknown', dest='unknown', default=UNKNOWN) | |
# version | |
parser.add_option('-v', '--version', action='callback', callback=version) | |
# vcs-specific formatting | |
for system in SYSTEMS: | |
default = 'VCPROMPT_FORMAT_%s' % system.__name__.upper() | |
default = os.environ.get(default, None) | |
dest = 'format-%s' % system.__name__ | |
flag = '--%s' % dest | |
parser.add_option(flag, dest=dest, default=default) | |
options, args = parser.parse_args() | |
output = vcprompt(options) | |
return output | |
@vcs(file=".bzr/branch/last-revision") | |
def bzr(options): | |
""" | |
Bazaar | |
The Bazaar version control system | |
""" | |
branch = revision = sha = modified = untracked = options.unknown | |
# local revision or global sha | |
if re.search('%(r|h)', options.format): | |
try: | |
fh = open(options.file, 'r') | |
for line in fh: | |
line = line.strip() | |
revision, sha = line.split(' ', 1) | |
# compensate for empty Bazaar repositories | |
if sha == 'null:': | |
sha = unknown | |
else: | |
sha = sha.rsplit('-', 1)[-1][:7] | |
break | |
except IOError: | |
pass | |
# status (modified/untracked) | |
if re.search('%[mu]', options.format): | |
command = 'bzr status --short' | |
process = Popen(command.split(), stdout=PIPE) | |
output = process.communicate()[0].strip() | |
returncode = process.returncode | |
if returncode == 0: | |
if output == '': | |
modified = '' | |
untracked = '' | |
else: | |
for line in output.split('\n'): | |
if line.startswith('M'): | |
modified = MODIFIED | |
elif line.startswith('?'): | |
untracked = UNTRACKED | |
# formatting | |
output = options.format | |
output = output.replace('%b', os.path.basename(options.path)) | |
output = output.replace('%h', sha) | |
output = output.replace('%r', revision) | |
output = output.replace('%m', modified) | |
output = output.replace('%u', untracked) | |
output = output.replace('%s', 'bzr') | |
output = output.replace('%n', 'bzr') | |
return output | |
@vcs(file="CVS") | |
def cvs(options): | |
""" | |
CVS | |
Concurrent Versions System. | |
""" | |
output = options.format | |
output = output.replace('%b', options.unknown) | |
output = output.replace('%h', options.unknown) | |
output = output.replace('%r', options.unknown) | |
output = output.replace('%m', options.unknown) | |
output = output.replace('%u', options.unknown) | |
output = output.replace('%s', 'cvs') | |
output = output.replace('%n', 'cvs') | |
return output | |
@vcs(file="_darcs/hashed_inventory") | |
def darcs(options): | |
""" | |
Darcs | |
Distributed. Interactive. Smart. | |
""" | |
branch = sha = modified = untracked = options.unknown | |
# sha | |
if re.search('%(h|r)', options.format): | |
command = 'darcs changes --last 1 --xml' | |
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 0: | |
dom = parseString(output) | |
patch = dom.getElementsByTagName("patch")[0].getAttribute("hash") | |
sha = patch.rsplit('-', 1)[-1].split('.')[0][:7] | |
# branch | |
# darcs doesn't have in-repo local branching (yet), so just use | |
# the directory name for now | |
# see also: http://bugs.darcs.net/issue555 | |
branch = os.path.basename(options.path) | |
# modified | |
if re.search('%[mu]', options.format): | |
command = 'darcs whatsnew -l -s' | |
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 1: | |
modified = '' | |
untracked = '' | |
elif returncode == 0: | |
for line in output: | |
line = line.strip() | |
if line.startswith('M'): | |
modified = MODIFIED | |
elif line.startswith('a'): | |
untracked = UNTRACKED | |
# formatting | |
output = options.format | |
output = output.replace('%b', branch) | |
output = output.replace('%h', sha) | |
output = output.replace('%r', sha) | |
output = output.replace('%m', modified) | |
output = output.replace('%u', untracked) | |
output = output.replace('%s', 'darcs') | |
output = output.replace('%n', 'darcs') | |
return output | |
@vcs(file="_FOSSIL_") | |
def fossil(options): | |
""" | |
Fossil | |
The Fossil version control system. | |
""" | |
branch = sha = modified = untracked = options.unknown | |
# all this just to get the repository file :( | |
repository = None | |
if has_sqlite3: | |
try: | |
conn = None | |
try: | |
query = "SELECT value FROM vvar where name = 'repository'" | |
conn = sqlite3.connect(options.file) | |
c = conn.cursor() | |
c.execute(query) | |
repository = c.fetchone()[0] | |
except sqlite3.OperationalError: | |
pass | |
finally: | |
if conn: | |
conn.close() | |
# grab the sha from the repo | |
if repository is not None and has_sqlite3: | |
try: | |
conn = None | |
try: | |
query = "SELECT uuid from blob ORDER BY rid DESC LIMIT 1" | |
conn = sqlite3.connect(repository) | |
c = conn.cursor() | |
c.execute(query) | |
sha = c.fetchone()[0][:7] | |
except sqlite3.OperationalError: | |
pass | |
finally: | |
if conn: | |
conn.close() | |
# branch | |
if sha != options.unknown and has_sqlite3: | |
try: | |
conn = None | |
try: | |
query = """SELECT value FROM tagxref WHERE rid = | |
(SELECT rid FROM blob WHERE uuid LIKE '%s%%') | |
AND value is not NULL LIMIT 1 """ % sha | |
conn = sqlite3.connect(repository) | |
c = conn.cursor() | |
c.execute(query) | |
branch = c.fetchone()[0] | |
except (sqlite3.OperationalError, TypeError): | |
pass | |
finally: | |
if conn: | |
conn.close() | |
# modified | |
if '%m' in options.format: | |
command = 'fossil changes' | |
process = Popen(command.split(), stdout=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 0: | |
if output: | |
modified = MODIFIED | |
else: | |
modified = '' | |
# untracked files | |
if '%u' in options.format: | |
command = 'fossil extras' | |
process = Popen(command.split(), stdout=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 0: | |
if output: | |
untracked = UNTRACKED | |
else: | |
untracked = '' | |
# parse out formatting string | |
output = options.format | |
output = output.replace('%b', branch) | |
output = output.replace('%h', sha) | |
output = output.replace('%r', sha) | |
output = output.replace('%m', modified) | |
output = output.replace('%u', untracked) | |
output = output.replace('%s', 'fossil') | |
output = output.replace('%n', 'fossil') | |
return output | |
@vcs(file=".git") | |
def git(options): | |
""" | |
Git | |
The fast version control system. | |
""" | |
staged = branch = sha = modified = untracked = options.unknown | |
def revstring(ref, chars=7): | |
if not os.path.exists(ref): | |
return '' | |
try: | |
fh = open(ref, 'r') | |
for line in fh: | |
return line.strip()[0:chars] | |
except IOError: | |
pass | |
return '' | |
# the current branch is required to get the sha | |
if re.search('%(b|r|h)', options.format): | |
branch_file = os.path.join(options.file, 'HEAD') | |
try: | |
fh = open(branch_file, 'r') | |
for line in fh: | |
line = line.strip() | |
if line.startswith('ref: refs/heads/'): | |
branch = (line.split('/')[-1] or options.unknown) | |
break | |
except IOError: | |
pass | |
# sha/revision | |
if re.search('%(r|h)', options.format) and branch != options.unknown: | |
sha_file = os.path.join(options.file, 'refs/heads/%s' % branch) | |
sha = revstring(sha_file) | |
# modified | |
if '%m' in options.format: | |
command = 'git diff --name-status --exit-code' | |
returncode = call(command, stdout=PIPE, stderr=PIPE, shell=True) | |
if returncode == 1: | |
modified = MODIFIED | |
else: | |
modified = '' | |
# untracked files | |
if '%u' in options.format: | |
command = 'git ls-files --other --exclude-standard' | |
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 0: | |
if output == '': | |
untracked = '' | |
else: | |
untracked = UNTRACKED | |
# staged files | |
if '%a' in options.format: | |
command = 'git diff --name-only --cached' | |
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 0: | |
if output == '': | |
staged = '' | |
else: | |
staged = STAGED | |
# formatting | |
output = options.format | |
output = output.replace('%b', branch) | |
output = output.replace('%h', sha) | |
output = output.replace('%r', sha) | |
output = output.replace('%m', modified) | |
output = output.replace('%u', untracked) | |
output = output.replace('%a', staged) | |
output = output.replace('%s', 'git') | |
output = output.replace('%n', 'git') | |
return output | |
@vcs(file=".hg") | |
def hg(options): | |
""" | |
Mercurial | |
The Mercurial version control system. | |
""" | |
branch = revision = sha = modified = untracked = options.unknown | |
# changeset ID or global sha | |
if re.search('%(r|h)', options.format): | |
try: | |
fh = open(os.path.join(options.file, 'cache/branchheads'), 'r') | |
line = fh.readline() | |
sha, revision = line.strip().split() | |
sha = sha[:7] | |
except IOError: | |
pass | |
# branch | |
if '%b' in options.format: | |
file = os.path.join(options.file, 'undo.branch') | |
try: | |
fh = open(file, 'r') | |
line = fh.readline() | |
branch = line.strip() | |
except IOError: | |
pass | |
# modified | |
if '%m' in options.format: | |
command = 'hg status --modified' | |
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) | |
output = process.communicate()[0].strip() | |
returncode = process.returncode | |
if returncode == 0: | |
if output == '': | |
modified = '' | |
else: | |
modified = MODIFIED | |
# untracked | |
if '%u' in options.format: | |
command = 'hg status --unknown' | |
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if output == '': | |
untracked = '' | |
else: | |
untracked = UNTRACKED | |
output = options.format | |
output = output.replace('%b', branch) | |
output = output.replace('%h', sha) | |
output = output.replace('%r', revision) | |
output = output.replace('%m', modified) | |
output = output.replace('%u', untracked) | |
output = output.replace('%s', 'hg') | |
output = output.replace('%n', 'hg') | |
return output | |
@vcs(file=".svn/entries") | |
def svn(options): | |
""" | |
Subversion | |
The Subversion version control system. | |
""" | |
branch = revision = modified = untracked = options.unknown | |
# branch | |
command = 'svn info %s' % options.path | |
process = Popen(command.split(), stdout=PIPE, stderr=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 0: | |
# compile some regexes | |
branch_regex = re.compile('((tags|branches)|trunk)') | |
revision_regex = re.compile('^Revision: (?P<revision>\d+)') | |
for line in output.split('\n'): | |
# branch | |
if '%b' in options.format: | |
if re.match('URL:', line): | |
matches = re.search(branch_regex, line) | |
if matches: | |
branch = matches.groups(0)[0] | |
# revision/sha | |
if re.search('%(r|h)', options.format): | |
if re.match('Revision:', line): | |
matches = re.search(revision_regex, line) | |
if 'revision' in matches.groupdict(): | |
revision = matches.group('revision') | |
# modified | |
if re.search('%[mu]', options.format): | |
command = 'svn status' | |
process = Popen(command, shell=True, stdout=PIPE) | |
output = process.communicate()[0] | |
returncode = process.returncode | |
if returncode == 0: | |
if not output: | |
modified = '' | |
untracked = '' | |
else: | |
codes = [line[0] for line in output.split('\n') if line] | |
if 'M' in codes: | |
modified = MODIFIED | |
if '?' in codes: | |
untracked = UNTRACKED | |
# formatting | |
output = options.format | |
output = output.replace('%r', revision) | |
output = output.replace('%h', revision) | |
output = output.replace('%b', branch) | |
output = output.replace('%m', modified) | |
output = output.replace('%u', untracked) | |
output = output.replace('%s', 'svn') | |
output = output.replace('%n', 'svn') | |
return output | |
if __name__ == '__main__': | |
prompt = main() | |
if prompt: | |
print prompt | |
else: | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment