Skip to content

Instantly share code, notes, and snippets.

@philpennock
Last active March 25, 2025 02:35
Show Gist options
  • Save philpennock/4dbeb8e053c77e6e3a6507975c0ff4ff to your computer and use it in GitHub Desktop.
Save philpennock/4dbeb8e053c77e6e3a6507975c0ff4ff to your computer and use it in GitHub Desktop.
Find VCS repos under the cwd and report their paths
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.7"
# ///
#
# I think this should work in 3.6, when f-strings were introduced,
# but I don't have 3.6 around and 3.7 is the earliest I can test with,
# so for PEP 723 script metadata purposes I am declaring >= 3.7.
"""
find_to_repo: find all dirs under the reference point which are repos
Stop at repos, don't descend into the repos.
Handling submodules etc is explicitly out of scope.
Use --help to see help.
Assumes under current directory if no directories given.
"""
# If updating this, then also update the Public gist:
# * [email protected]:4dbeb8e053c77e6e3a6507975c0ff4ff.git
# * https://gist.github.com/philpennock/4dbeb8e053c77e6e3a6507975c0ff4ff
__author__ = '[email protected] (Phil Pennock)'
import argparse
import dataclasses
import os
import pathlib
import shlex
import stat
import subprocess
import sys
# We hard-code this one as internal logic
GIT_BARE_NEEDALL_DIRS = frozenset(['info', 'objects', 'refs'])
# This is used by some update tooling
SKIP_CHILDREN_FILENAME = '.skip-children' # selectively skip children
SKIP_ALL_FILENAME = '.skip-updates' # "prune here"
REPO_RECURSE_FLAGFILES = frozenset(['.update-children', SKIP_ALL_FILENAME])
class Error(Exception):
"""Base class for exceptions from find_to_repo."""
pass
class Exit(Error):
"""Errors which exit cleanly without a stack trace."""
status: int
USAGE = 64
def __init__(self, *args, status: int=1, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.status = status
@dataclasses.dataclass
class RepoFilter:
Bool: str
Need: bool
Default: bool
class Walker(object):
def __init__(self, options):
self.repo_dirs = set(options.repo_dirs)
self.bundle_exts = set(options.bundle_exts)
self.dir_exts = set(options.dir_exts)
self.only_show = set(options.only_repo_types)
self.use_only_show = bool(self.only_show)
self.exclude_paths = set(options.exclude_paths)
self.ignore_paths = set(options.ignore_paths)
self.exclude_dirs = set(options.exclude_dirs)
self.external_files = set(options.external_file)
self.required_meta_file = options.meta_file
self.required_exists_file = options.exists_file
self.print_file = options.print_file
self.quiet = options.quiet
self.verbose = options.verbose
self.symlinks_file = None
self.symlinks_follow = options.symlinks_follow
self.warn_no_flagfiles = options.warn_no_flagfiles
self.obey_skips = options.obey_skips
self.skip_archived = options.skip_archived
self.git_command = options.git_cmd
self.git_repo_set_bool_filters(options)
self.shown_count = 0
if options.null_terminate:
self.field_separator = '\0'
self.print_end = '\0'
else:
self.field_separator = '\t'
self.print_end = '\n'
if options.symlinks_fd is not None and options.symlinks_fd >= 0:
self.symlinks_file = os.fdopen(options.symlinks_fd, mode='wt', buffering=1, encoding='UTF-8')
self.required_exists_in_any_dir = None
self.required_exists_pattern = None
if options.exists_pattern:
if '*' in options.exists_pattern or '?' in options.exists_pattern:
self.required_exists_pattern = options.exists_pattern
else:
self.required_exists_in_any_dir = options.exists_pattern
def _report_one(self, full_repo: str, ext: str) -> None:
self.shown_count += 1
if self.quiet:
return
elif self.verbose:
if callable(ext):
ext = ext()
print(f'{ext}{self.field_separator}{full_repo}', end=self.print_end)
else:
print(full_repo, end=self.print_end)
def _report_one_resolving(self, repo_root: str, file_within: str, vcs_subdir: str) -> None:
if self.print_file:
real = os.path.realpath(file_within)
if repo_root.startswith('./'):
relative = os.path.relpath(real)
if relative.startswith('../'):
relative = real
elif relative.startswith('/'):
pass
else:
relative = './' + relative
self._report_one(relative, vcs_subdir)
else:
self._report_one(real, vcs_subdir)
else:
self._report_one(repo_root, vcs_subdir)
def find_under(self, top):
exclude = set([x if os.path.sep in x else top + os.path.sep + x for x in self.exclude_paths])
ignore_paths = set([x if (os.path.sep in x or x == top) else top + os.path.sep + x for x in self.ignore_paths])
for root, dirs, files in os.walk(top, topdown=True, followlinks=self.symlinks_follow):
if root in exclude:
del dirs[:]
continue
if root in ignore_paths:
continue
deleted = 0
for i, d in list(enumerate(dirs)): # modifying in-place, list(generator) to get stable results
if d in self.exclude_dirs:
j = i - deleted
dirs[j:] = dirs[j+1:]
deleted += 1
del deleted
# For our purposes, a repo is one of:
# 1. A file
# 2. A directory with a flag-directory within it
# 3. A directory with a flag-FILE within it (.git worktree, etc)
# 4. A directory named a particular way (bare .git repos)
# 5. Fallback for bare git repos not named that way
is_repo_thisdir = False
for bundle in [f for f in files if os.path.splitext(f)[1] in self.bundle_exts]:
full_repo, ext = os.path.join(root, bundle), os.path.splitext(bundle)[1]
is_repo_thisdir = True
if self.use_only_show and ext not in self.only_show:
continue
# File, no --meta-file support
if self.required_meta_file or self.required_exists_file:
continue
elif full_repo in ignore_paths:
continue
else:
self._report_one(full_repo, ext)
dirs_set = set(dirs)
if not self.repo_dirs.isdisjoint(dirs_set):
# This is a checkout dir which contains a meta dir inside it, eg a .git/ sub-dir
is_repo_thisdir = True
report = None
printable_list = []
if self.use_only_show and self.only_show.isdisjoint(dirs_set):
pass
# the ignore_paths is handled by the path to the repo being root, so top of function
else:
printable = root
if self.required_exists_file:
printable = os.path.join(root, self.required_exists_file)
if self.use_only_show:
seek = self.only_show
else:
seek = dirs_set
first_repo_subdir = sorted(self.repo_dirs.intersection(seek))[0]
if self.skip_archived and first_repo_subdir == '.git' and self.git_repo_is_archived(root):
pass # be careful, still need to del dirs[:] below
elif not self.git_repo_meets_bool_filters(root):
pass
# := is too new to want to use it except when debugging, here
elif self.required_meta_file:
p = os.path.join(root, first_repo_subdir, self.required_meta_file)
if os.path.exists(p):
if self.print_file:
report = p
else:
report = printable
else:
pass
elif self.required_exists_file and not os.path.exists(os.path.join(root, self.required_exists_file)):
pass
elif self.required_exists_file or (not self.required_exists_pattern) and (not self.required_exists_in_any_dir):
# ugh re that conditional, time for rewrite
report = printable
# This is separate so we can use both -e and -E
if self.required_exists_in_any_dir is not None:
printable_list += sorted(pathlib.Path(root).rglob(self.required_exists_in_any_dir))
if self.required_exists_pattern is not None:
printable_list += sorted(pathlib.Path(root).glob(self.required_exists_pattern))
if report is not None:
self._report_one(report, lambda: ','.join(self.repo_dirs.intersection(dirs_set)))
if printable_list:
for item in printable_list:
# sigh, the pathlib approach skips the './' at the start
item = str(item)
if not item.startswith('/'):
item = './' + item
self._report_one(item, lambda: ','.join(self.repo_dirs.intersection(dirs_set)))
del dirs[:]
if '.git' in files:
# It's a worktree or submodule or something. For us to see it, _probably_ worktree
vcs_subdir = '.git'
stub_git_dir = os.path.join(root, vcs_subdir)
git_dir = None
if self.skip_archived and self.git_repo_is_archived(root):
continue
if not self.git_repo_meets_bool_filters(root):
continue
try:
git_dir = [t[1] for t in [l.split() for l in open(stub_git_dir)] if t[0] == 'gitdir:'][0]
except Exception:
pass
if git_dir is not None:
if self.required_exists_file:
# This is relative to the working tree, so 'root' is correct
p = os.path.join(root, self.required_exists_file)
if os.path.exists(p):
self._report_one(p, vcs_subdir)
elif self.required_meta_file:
# NB: 'config' does not exist inside the real git-dir for a working-tree, that's shared-only.
# Thus our implementing the common_dir_ptr check.
common_dir_ptr = os.path.join(git_dir, 'commondir')
p = os.path.join(git_dir, self.required_meta_file)
if os.path.exists(p):
self._report_one_resolving(root, p, vcs_subdir)
elif os.path.exists(common_dir_ptr):
common_dir = open(common_dir_ptr).read().strip()
p_common = os.path.join(git_dir, common_dir, self.required_meta_file)
if os.path.exists(p_common):
self._report_one_resolving(root, p_common, vcs_subdir)
else:
self._report_one(root, vcs_subdir)
byname = set([d for d in dirs if os.path.splitext(d)[1] in self.dir_exts])
if byname:
for d in byname:
full_repo, ext = os.path.join(root, d), os.path.splitext(d)[1]
if self.use_only_show and ext not in self.only_show:
pass
else:
# this is a dir which is a "git bare repo" or moral equivalent, hinted at by the filename extension
if self.required_meta_file and not os.path.exists(os.path.join(full_repo, self.required_meta_file)):
# print(f'skipping {full_repo} [{self.required_meta_file}]', file=sys.stderr)
pass
elif self.required_exists_file:
# this can't exist inside a bare repo
pass
else:
self._report_one(full_repo, ext)
for i in range(len(dirs)-1, -1, -1):
if dirs[i] in byname:
del dirs[i]
if dirs_set.intersection(GIT_BARE_NEEDALL_DIRS) == GIT_BARE_NEEDALL_DIRS:
# this is a git bare repo, not named to have .git extension
is_repo_thisdir = True
if self.skip_archived and self.git_repo_is_archived(root):
pass
elif not self.git_repo_meets_bool_filters(root):
pass
elif self.use_only_show and '.git' not in self.only_show:
pass
elif os.path.dirname(root) in ignore_paths:
# if we skip a git repo foo, then skip the .git dir inside it too
pass
else:
if self.required_meta_file and not os.path.exists(os.path.join(root, self.required_meta_file)):
# print(f'skipping {root} [{self.required_meta_file}]', file=sys.stderr)
pass
elif self.required_exists_file:
# this can't exist inside a bare repo
pass
else:
self._report_one(root, '.git')
del dirs[:]
if self.warn_no_flagfiles and not is_repo_thisdir:
if not set(files).intersection(REPO_RECURSE_FLAGFILES):
print(f'# WARNING: not a repo, no flag-files: {root}', file=sys.stderr)
# Ideally, we'd detect SKIP_CHILDREN_FILENAME and in sub-dirs under that, suppress this warning, by mutating a context only passed to child dirs.
# But the API doesn't offer us that context. So instead I added --obey-skips: a crude bodge.
if self.external_files and not is_repo_thisdir:
found = set(files).intersection(self.external_files)
if found:
# This does not use --quiet because we've explicitly asked for these specific files; if don't want to see, don't ask.
# Use-case: use --quiet in combination with this, to find control files which exist outside of repos.
for filename in sorted(found):
print(os.path.join(root, filename))
if self.obey_skips and SKIP_ALL_FILENAME in files:
del dirs[:]
if self.obey_skips and SKIP_CHILDREN_FILENAME in files:
skip = [entry for entry in [line.rstrip() for line in open(os.path.join(root, SKIP_CHILDREN_FILENAME))] if entry and not entry.startswith('#')]
for entry in skip:
if entry in dirs:
dirs.remove(entry)
if self.symlinks_file is not None and not is_repo_thisdir:
# It's already in dirs, so there's already been a stat ... I need to decide when to switch to that newer walk API which caches this
for e in dirs:
fn = os.path.join(root, e)
st = os.lstat(fn)
if stat.S_ISLNK(st.st_mode):
print(f'{fn} -> {os.readlink(fn)}', file=self.symlinks_file)
def git_config_get_bool(self, repo_path: str, option: str, default: bool) -> bool:
# This is not as efficient as normal for this tool, but it's "correct"
# We could consider cheating and reading files directly, but I think the --skip-archived case is rare enough that I will live with the overhead.
# (On my 2019 vintage laptop 2025-02: 217 repos, 1 archived, time take goes from around 0.115s to around 0.593s; noticeable but tolerable.)
#
# 2025-02: this should be: git config get --type bool --local -- <option>
# but the switch of git-config to taking subcommands is still new enough and I don't have current git on all the places I use find_to_repo
# so for now we stick to the older invocation syntax. Which _for this case_ is just "--get" instead of "get".
cmdline = [self.git_command, "-C", repo_path, "config", "--get", "--type", "bool", "--local", "--", option]
value = subprocess.run(cmdline, capture_output=True).stdout.rstrip().decode('US-ASCII')
if value == '':
return default
elif value == 'true':
return True
elif value == 'false':
return False
else:
raise Exit(f'failed to parse {value!r} as return value from: {shlex.join(cmdline)}')
def git_repo_is_archived(self, path: str) -> bool:
"""Return whether or not the repo should be considered "archived".
The concept could vary, but for our purposes it's "the forge which is upstream has marked it archived", and we don't have forge communication in this tool.
Instead, we use the git config bool pdp.forge-has-archived as our current indicator.
What manages that setting is not something we care about.
"""
return self.git_config_get_bool(path, 'pdp.forge-has-archived', False)
def git_repo_set_bool_filters(self, options: argparse.Namespace) -> None:
self._has_git_bool_filters = False
self._git_bool_filters = []
for item in options.require_repo_bool_true:
self._git_bool_filters.append(RepoFilter(Bool=item, Need=True, Default=False))
for item in options.require_repo_bool_false:
self._git_bool_filters.append(RepoFilter(Bool=item, Need=False, Default=True))
for item in options.require_repo_bool_not_true:
self._git_bool_filters.append(RepoFilter(Bool=item, Need=False, Default=False))
if self._git_bool_filters:
self._has_git_bool_filters = True
def git_repo_meets_bool_filters(self, path: str) -> bool:
if not self._has_git_bool_filters:
return True
for repo_filter in self._git_bool_filters:
if self.git_config_get_bool(path, repo_filter.Bool, repo_filter.Default) != repo_filter.Need:
return False
return True
def _main(args, argv0):
parser = argparse.ArgumentParser(
add_help=False,
description='\n'.join(line for line in __doc__.split('\n') if '--help' not in line),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help=argparse.SUPPRESS)
# FIXME: Technically CVSROOT is a collection of repos as it's the server side,
# so we need to then list all the sibling directories as CVS stores (not check-outs).
repo_stops = ['.git', '.hg', 'CVS', 'CVSROOT', '.bzr', '.svn', '.sl']
bundle_exts = ['.fossil']
dir_exts = ['.git']
known_types = frozenset(set(repo_stops) | set(bundle_exts) | set(dir_exts))
rctrls = parser.add_argument_group('Recursion Controls')
vcstps = parser.add_argument_group('VCS Type Controls')
skpctl = parser.add_argument_group('Repo Skip Controls')
report = parser.add_argument_group('Reporting Controls')
qvloud = report.add_mutually_exclusive_group()
rctrls.add_argument('-x', '--exclude-path', '--exclude', # --exclude included here for backwards compatibility
action='append', dest='exclude_paths', default=[], metavar='PREFIX',
help='skip anything under one of these prefices')
rctrls.add_argument('-X', '--exclude-dir',
action='append', dest='exclude_dirs', default=[], metavar='DIR',
help='skip looking inside any dir named for one of these')
rctrls.add_argument('-i', '--ignore-paths',
action='append', dest='ignore_paths', default=[], metavar='PREFIX',
help='ignore these paths being repos, look inside them')
vcstps.add_argument('-o', '--only-repo-type',
action='append', dest='only_repo_types', metavar='TYPE+',
choices=known_types, default=[],
help='only print for these repo dirs (available: %(choices)s) (default: %(default)s)')
vcstps.add_argument('-r', '--repo-dir',
action='append', dest='repo_dirs', default=repo_stops, metavar='DIR',
help='Add to list of repo dirs (default %(default)s)')
vcstps.add_argument('-f', '--bundle-ext',
action='append', dest='bundle_exts', default=bundle_exts, metavar='EXT',
help='Add to list of repo file extensions (default %(default)s)')
vcstps.add_argument('-d', '--dir-ext',
action='append', dest='dir_exts', default=dir_exts, metavar='EXT',
help='Add to list of repo dirname extensions (default %(default)s)')
rctrls.add_argument('--symlinks-fd',
type=int, default=None, metavar='FD',
help='FD to write encountered symlinks to')
rctrls.add_argument('-L', '--symlinks-follow', # -L chosen to match BSD convention for following all symlinks
action='store_true', default=False,
help='Follow symlinks to find repos')
rctrls.add_argument('--warn-no-flagfiles',
action='store_true', default=False,
help='Outside repos, warn if no control flag-files are seen (probably want --obey-skips too)')
rctrls.add_argument('--obey-skips',
action='store_true', default=False,
help='Obey .skip-children files')
skpctl.add_argument('--skip-archived', '--no-archived', '-A',
action='store_true', default=False,
help='Skip repos we believe are "archived"')
skpctl.add_argument('--require-repo-bool-true',
action='append', metavar='GitCfgItem', default=[],
help='Only report repos with this git config bool true')
skpctl.add_argument('--require-repo-bool-false',
action='append', metavar='GitCfgItem', default=[],
help='Only report repos with this git config bool explicitly false')
skpctl.add_argument('--require-repo-bool-not-true',
action='append', metavar='GitCfgItem', default=[],
help='Only report repos with this git config bool false-or-absent')
skpctl.add_argument('-F', '--meta-file',
type=str, default=None, metavar='FN',
help='filename relative to repo meta-dir to require to exist, to print')
skpctl.add_argument('-e', '--exists-file',
type=str, default=None, metavar='FN',
help='filename relative to repo working tree to require to exist, to print, including file')
skpctl.add_argument('-E', '--exists-pattern',
type=str, default=None, metavar='GLOB',
help='Pattern to match inside a repo to repo; if no * or ? present, \'**/\' is implicitly preficed')
qvloud.add_argument('-q', '--quiet',
action='store_true', default=False,
help='Do not report repos')
qvloud.add_argument('-v', '--verbose',
action='store_true', dest='verbose', default=False,
help='Be more verbose (see repo type too)')
report.add_argument('-p', '--print-file',
action='store_true', default=False,
help='make -F behave like -e and print the path to the file')
report.add_argument('--external-file',
action='append', metavar='FN', default=[],
help='Report these files found outside of repos')
report.add_argument('-c', '--count',
action='store_true', dest='want_count', default=False,
help='Print a count to stderr at the end')
report.add_argument('-0', '--null-terminate', '--null',
action='store_true', default=False,
help='End each record with ASCII NUL instead of NL')
parser.add_argument('--git-cmd',
type=str, default="git", help=argparse.SUPPRESS)
parser.add_argument('top_dirs',
type=str, nargs='*', metavar='DIR',
help='top level directories')
options = parser.parse_args(args=args)
if options.top_dirs:
found_any = False
for d in options.top_dirs:
if pathlib.Path(d).is_dir():
found_any = True
elif pathlib.Path(d).exists():
print(f'{argv0}: warning: command-line arg {d!r} exists but is not a directory', file=sys.stderr)
else:
print(f'{argv0}: warning: command-line arg {d!r} does not exist', file=sys.stderr)
if not found_any:
print(f'{argv0}: did you mean to use -e to specify a pattern relative to the repo?', file=sys.stderr)
else:
options.top_dirs = ['.']
if options.exclude_dirs:
bad = list(filter(lambda d: os.path.sep in d, options.exclude_dirs))
if bad:
raise Exit(f'directory separator found in an \'--exclude-dir\' items {bad!r}, will never match', status=Exit.USAGE)
unknown = set(options.only_repo_types) - known_types
if unknown:
display_form = ' '.join(sorted(unknown))
raise Exit(f'unknown repo types to limit to: {display_form}')
walker = Walker(options)
for top in options.top_dirs:
walker.find_under(top)
if options.want_count:
print(f'{argv0}: saw {walker.shown_count} repos', file=sys.stderr)
return 0
if __name__ == '__main__':
argv0 = (lambda n: n[:-3] if len(n) >=4 and n.endswith('.py') else n)(sys.argv[0].rsplit('/')[-1])
try:
rv = _main(sys.argv[1:], argv0=argv0)
except Exit as e:
for arg in e.args:
print(f'{argv0}: {arg}', file=sys.stderr)
sys.exit(e.status)
except KeyboardInterrupt:
print(f'\n{argv0}: KeyboardInterrupt', file=sys.stderr)
# 128 + SIGINT (2, per POSIX)
# This is not "right", as we're not exiting on a signal, but from the perspective of a shell, it will be "roughly right"
sys.exit(130)
sys.exit(rv)
# vim: set ft=python sw=4 expandtab :
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment