Last active
March 25, 2025 02:35
-
-
Save philpennock/4dbeb8e053c77e6e3a6507975c0ff4ff to your computer and use it in GitHub Desktop.
Find VCS repos under the cwd and report their paths
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# requires-python = ">=3.7" | |
# /// | |
# | |
# I think this should work in 3.6, when f-strings were introduced, | |
# but I don't have 3.6 around and 3.7 is the earliest I can test with, | |
# so for PEP 723 script metadata purposes I am declaring >= 3.7. | |
""" | |
find_to_repo: find all dirs under the reference point which are repos | |
Stop at repos, don't descend into the repos. | |
Handling submodules etc is explicitly out of scope. | |
Use --help to see help. | |
Assumes under current directory if no directories given. | |
""" | |
# If updating this, then also update the Public gist: | |
# * [email protected]:4dbeb8e053c77e6e3a6507975c0ff4ff.git | |
# * https://gist.github.com/philpennock/4dbeb8e053c77e6e3a6507975c0ff4ff | |
__author__ = '[email protected] (Phil Pennock)' | |
import argparse | |
import dataclasses | |
import os | |
import pathlib | |
import shlex | |
import stat | |
import subprocess | |
import sys | |
# We hard-code this one as internal logic | |
GIT_BARE_NEEDALL_DIRS = frozenset(['info', 'objects', 'refs']) | |
# This is used by some update tooling | |
SKIP_CHILDREN_FILENAME = '.skip-children' # selectively skip children | |
SKIP_ALL_FILENAME = '.skip-updates' # "prune here" | |
REPO_RECURSE_FLAGFILES = frozenset(['.update-children', SKIP_ALL_FILENAME]) | |
class Error(Exception): | |
"""Base class for exceptions from find_to_repo.""" | |
pass | |
class Exit(Error): | |
"""Errors which exit cleanly without a stack trace.""" | |
status: int | |
USAGE = 64 | |
def __init__(self, *args, status: int=1, **kwargs) -> None: | |
super().__init__(*args, **kwargs) | |
self.status = status | |
@dataclasses.dataclass | |
class RepoFilter: | |
Bool: str | |
Need: bool | |
Default: bool | |
class Walker(object): | |
def __init__(self, options): | |
self.repo_dirs = set(options.repo_dirs) | |
self.bundle_exts = set(options.bundle_exts) | |
self.dir_exts = set(options.dir_exts) | |
self.only_show = set(options.only_repo_types) | |
self.use_only_show = bool(self.only_show) | |
self.exclude_paths = set(options.exclude_paths) | |
self.ignore_paths = set(options.ignore_paths) | |
self.exclude_dirs = set(options.exclude_dirs) | |
self.external_files = set(options.external_file) | |
self.required_meta_file = options.meta_file | |
self.required_exists_file = options.exists_file | |
self.print_file = options.print_file | |
self.quiet = options.quiet | |
self.verbose = options.verbose | |
self.symlinks_file = None | |
self.symlinks_follow = options.symlinks_follow | |
self.warn_no_flagfiles = options.warn_no_flagfiles | |
self.obey_skips = options.obey_skips | |
self.skip_archived = options.skip_archived | |
self.git_command = options.git_cmd | |
self.git_repo_set_bool_filters(options) | |
self.shown_count = 0 | |
if options.null_terminate: | |
self.field_separator = '\0' | |
self.print_end = '\0' | |
else: | |
self.field_separator = '\t' | |
self.print_end = '\n' | |
if options.symlinks_fd is not None and options.symlinks_fd >= 0: | |
self.symlinks_file = os.fdopen(options.symlinks_fd, mode='wt', buffering=1, encoding='UTF-8') | |
self.required_exists_in_any_dir = None | |
self.required_exists_pattern = None | |
if options.exists_pattern: | |
if '*' in options.exists_pattern or '?' in options.exists_pattern: | |
self.required_exists_pattern = options.exists_pattern | |
else: | |
self.required_exists_in_any_dir = options.exists_pattern | |
def _report_one(self, full_repo: str, ext: str) -> None: | |
self.shown_count += 1 | |
if self.quiet: | |
return | |
elif self.verbose: | |
if callable(ext): | |
ext = ext() | |
print(f'{ext}{self.field_separator}{full_repo}', end=self.print_end) | |
else: | |
print(full_repo, end=self.print_end) | |
def _report_one_resolving(self, repo_root: str, file_within: str, vcs_subdir: str) -> None: | |
if self.print_file: | |
real = os.path.realpath(file_within) | |
if repo_root.startswith('./'): | |
relative = os.path.relpath(real) | |
if relative.startswith('../'): | |
relative = real | |
elif relative.startswith('/'): | |
pass | |
else: | |
relative = './' + relative | |
self._report_one(relative, vcs_subdir) | |
else: | |
self._report_one(real, vcs_subdir) | |
else: | |
self._report_one(repo_root, vcs_subdir) | |
def find_under(self, top): | |
exclude = set([x if os.path.sep in x else top + os.path.sep + x for x in self.exclude_paths]) | |
ignore_paths = set([x if (os.path.sep in x or x == top) else top + os.path.sep + x for x in self.ignore_paths]) | |
for root, dirs, files in os.walk(top, topdown=True, followlinks=self.symlinks_follow): | |
if root in exclude: | |
del dirs[:] | |
continue | |
if root in ignore_paths: | |
continue | |
deleted = 0 | |
for i, d in list(enumerate(dirs)): # modifying in-place, list(generator) to get stable results | |
if d in self.exclude_dirs: | |
j = i - deleted | |
dirs[j:] = dirs[j+1:] | |
deleted += 1 | |
del deleted | |
# For our purposes, a repo is one of: | |
# 1. A file | |
# 2. A directory with a flag-directory within it | |
# 3. A directory with a flag-FILE within it (.git worktree, etc) | |
# 4. A directory named a particular way (bare .git repos) | |
# 5. Fallback for bare git repos not named that way | |
is_repo_thisdir = False | |
for bundle in [f for f in files if os.path.splitext(f)[1] in self.bundle_exts]: | |
full_repo, ext = os.path.join(root, bundle), os.path.splitext(bundle)[1] | |
is_repo_thisdir = True | |
if self.use_only_show and ext not in self.only_show: | |
continue | |
# File, no --meta-file support | |
if self.required_meta_file or self.required_exists_file: | |
continue | |
elif full_repo in ignore_paths: | |
continue | |
else: | |
self._report_one(full_repo, ext) | |
dirs_set = set(dirs) | |
if not self.repo_dirs.isdisjoint(dirs_set): | |
# This is a checkout dir which contains a meta dir inside it, eg a .git/ sub-dir | |
is_repo_thisdir = True | |
report = None | |
printable_list = [] | |
if self.use_only_show and self.only_show.isdisjoint(dirs_set): | |
pass | |
# the ignore_paths is handled by the path to the repo being root, so top of function | |
else: | |
printable = root | |
if self.required_exists_file: | |
printable = os.path.join(root, self.required_exists_file) | |
if self.use_only_show: | |
seek = self.only_show | |
else: | |
seek = dirs_set | |
first_repo_subdir = sorted(self.repo_dirs.intersection(seek))[0] | |
if self.skip_archived and first_repo_subdir == '.git' and self.git_repo_is_archived(root): | |
pass # be careful, still need to del dirs[:] below | |
elif not self.git_repo_meets_bool_filters(root): | |
pass | |
# := is too new to want to use it except when debugging, here | |
elif self.required_meta_file: | |
p = os.path.join(root, first_repo_subdir, self.required_meta_file) | |
if os.path.exists(p): | |
if self.print_file: | |
report = p | |
else: | |
report = printable | |
else: | |
pass | |
elif self.required_exists_file and not os.path.exists(os.path.join(root, self.required_exists_file)): | |
pass | |
elif self.required_exists_file or (not self.required_exists_pattern) and (not self.required_exists_in_any_dir): | |
# ugh re that conditional, time for rewrite | |
report = printable | |
# This is separate so we can use both -e and -E | |
if self.required_exists_in_any_dir is not None: | |
printable_list += sorted(pathlib.Path(root).rglob(self.required_exists_in_any_dir)) | |
if self.required_exists_pattern is not None: | |
printable_list += sorted(pathlib.Path(root).glob(self.required_exists_pattern)) | |
if report is not None: | |
self._report_one(report, lambda: ','.join(self.repo_dirs.intersection(dirs_set))) | |
if printable_list: | |
for item in printable_list: | |
# sigh, the pathlib approach skips the './' at the start | |
item = str(item) | |
if not item.startswith('/'): | |
item = './' + item | |
self._report_one(item, lambda: ','.join(self.repo_dirs.intersection(dirs_set))) | |
del dirs[:] | |
if '.git' in files: | |
# It's a worktree or submodule or something. For us to see it, _probably_ worktree | |
vcs_subdir = '.git' | |
stub_git_dir = os.path.join(root, vcs_subdir) | |
git_dir = None | |
if self.skip_archived and self.git_repo_is_archived(root): | |
continue | |
if not self.git_repo_meets_bool_filters(root): | |
continue | |
try: | |
git_dir = [t[1] for t in [l.split() for l in open(stub_git_dir)] if t[0] == 'gitdir:'][0] | |
except Exception: | |
pass | |
if git_dir is not None: | |
if self.required_exists_file: | |
# This is relative to the working tree, so 'root' is correct | |
p = os.path.join(root, self.required_exists_file) | |
if os.path.exists(p): | |
self._report_one(p, vcs_subdir) | |
elif self.required_meta_file: | |
# NB: 'config' does not exist inside the real git-dir for a working-tree, that's shared-only. | |
# Thus our implementing the common_dir_ptr check. | |
common_dir_ptr = os.path.join(git_dir, 'commondir') | |
p = os.path.join(git_dir, self.required_meta_file) | |
if os.path.exists(p): | |
self._report_one_resolving(root, p, vcs_subdir) | |
elif os.path.exists(common_dir_ptr): | |
common_dir = open(common_dir_ptr).read().strip() | |
p_common = os.path.join(git_dir, common_dir, self.required_meta_file) | |
if os.path.exists(p_common): | |
self._report_one_resolving(root, p_common, vcs_subdir) | |
else: | |
self._report_one(root, vcs_subdir) | |
byname = set([d for d in dirs if os.path.splitext(d)[1] in self.dir_exts]) | |
if byname: | |
for d in byname: | |
full_repo, ext = os.path.join(root, d), os.path.splitext(d)[1] | |
if self.use_only_show and ext not in self.only_show: | |
pass | |
else: | |
# this is a dir which is a "git bare repo" or moral equivalent, hinted at by the filename extension | |
if self.required_meta_file and not os.path.exists(os.path.join(full_repo, self.required_meta_file)): | |
# print(f'skipping {full_repo} [{self.required_meta_file}]', file=sys.stderr) | |
pass | |
elif self.required_exists_file: | |
# this can't exist inside a bare repo | |
pass | |
else: | |
self._report_one(full_repo, ext) | |
for i in range(len(dirs)-1, -1, -1): | |
if dirs[i] in byname: | |
del dirs[i] | |
if dirs_set.intersection(GIT_BARE_NEEDALL_DIRS) == GIT_BARE_NEEDALL_DIRS: | |
# this is a git bare repo, not named to have .git extension | |
is_repo_thisdir = True | |
if self.skip_archived and self.git_repo_is_archived(root): | |
pass | |
elif not self.git_repo_meets_bool_filters(root): | |
pass | |
elif self.use_only_show and '.git' not in self.only_show: | |
pass | |
elif os.path.dirname(root) in ignore_paths: | |
# if we skip a git repo foo, then skip the .git dir inside it too | |
pass | |
else: | |
if self.required_meta_file and not os.path.exists(os.path.join(root, self.required_meta_file)): | |
# print(f'skipping {root} [{self.required_meta_file}]', file=sys.stderr) | |
pass | |
elif self.required_exists_file: | |
# this can't exist inside a bare repo | |
pass | |
else: | |
self._report_one(root, '.git') | |
del dirs[:] | |
if self.warn_no_flagfiles and not is_repo_thisdir: | |
if not set(files).intersection(REPO_RECURSE_FLAGFILES): | |
print(f'# WARNING: not a repo, no flag-files: {root}', file=sys.stderr) | |
# Ideally, we'd detect SKIP_CHILDREN_FILENAME and in sub-dirs under that, suppress this warning, by mutating a context only passed to child dirs. | |
# But the API doesn't offer us that context. So instead I added --obey-skips: a crude bodge. | |
if self.external_files and not is_repo_thisdir: | |
found = set(files).intersection(self.external_files) | |
if found: | |
# This does not use --quiet because we've explicitly asked for these specific files; if don't want to see, don't ask. | |
# Use-case: use --quiet in combination with this, to find control files which exist outside of repos. | |
for filename in sorted(found): | |
print(os.path.join(root, filename)) | |
if self.obey_skips and SKIP_ALL_FILENAME in files: | |
del dirs[:] | |
if self.obey_skips and SKIP_CHILDREN_FILENAME in files: | |
skip = [entry for entry in [line.rstrip() for line in open(os.path.join(root, SKIP_CHILDREN_FILENAME))] if entry and not entry.startswith('#')] | |
for entry in skip: | |
if entry in dirs: | |
dirs.remove(entry) | |
if self.symlinks_file is not None and not is_repo_thisdir: | |
# It's already in dirs, so there's already been a stat ... I need to decide when to switch to that newer walk API which caches this | |
for e in dirs: | |
fn = os.path.join(root, e) | |
st = os.lstat(fn) | |
if stat.S_ISLNK(st.st_mode): | |
print(f'{fn} -> {os.readlink(fn)}', file=self.symlinks_file) | |
def git_config_get_bool(self, repo_path: str, option: str, default: bool) -> bool: | |
# This is not as efficient as normal for this tool, but it's "correct" | |
# We could consider cheating and reading files directly, but I think the --skip-archived case is rare enough that I will live with the overhead. | |
# (On my 2019 vintage laptop 2025-02: 217 repos, 1 archived, time take goes from around 0.115s to around 0.593s; noticeable but tolerable.) | |
# | |
# 2025-02: this should be: git config get --type bool --local -- <option> | |
# but the switch of git-config to taking subcommands is still new enough and I don't have current git on all the places I use find_to_repo | |
# so for now we stick to the older invocation syntax. Which _for this case_ is just "--get" instead of "get". | |
cmdline = [self.git_command, "-C", repo_path, "config", "--get", "--type", "bool", "--local", "--", option] | |
value = subprocess.run(cmdline, capture_output=True).stdout.rstrip().decode('US-ASCII') | |
if value == '': | |
return default | |
elif value == 'true': | |
return True | |
elif value == 'false': | |
return False | |
else: | |
raise Exit(f'failed to parse {value!r} as return value from: {shlex.join(cmdline)}') | |
def git_repo_is_archived(self, path: str) -> bool: | |
"""Return whether or not the repo should be considered "archived". | |
The concept could vary, but for our purposes it's "the forge which is upstream has marked it archived", and we don't have forge communication in this tool. | |
Instead, we use the git config bool pdp.forge-has-archived as our current indicator. | |
What manages that setting is not something we care about. | |
""" | |
return self.git_config_get_bool(path, 'pdp.forge-has-archived', False) | |
def git_repo_set_bool_filters(self, options: argparse.Namespace) -> None: | |
self._has_git_bool_filters = False | |
self._git_bool_filters = [] | |
for item in options.require_repo_bool_true: | |
self._git_bool_filters.append(RepoFilter(Bool=item, Need=True, Default=False)) | |
for item in options.require_repo_bool_false: | |
self._git_bool_filters.append(RepoFilter(Bool=item, Need=False, Default=True)) | |
for item in options.require_repo_bool_not_true: | |
self._git_bool_filters.append(RepoFilter(Bool=item, Need=False, Default=False)) | |
if self._git_bool_filters: | |
self._has_git_bool_filters = True | |
def git_repo_meets_bool_filters(self, path: str) -> bool: | |
if not self._has_git_bool_filters: | |
return True | |
for repo_filter in self._git_bool_filters: | |
if self.git_config_get_bool(path, repo_filter.Bool, repo_filter.Default) != repo_filter.Need: | |
return False | |
return True | |
def _main(args, argv0): | |
parser = argparse.ArgumentParser( | |
add_help=False, | |
description='\n'.join(line for line in __doc__.split('\n') if '--help' not in line), | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
) | |
parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help=argparse.SUPPRESS) | |
# FIXME: Technically CVSROOT is a collection of repos as it's the server side, | |
# so we need to then list all the sibling directories as CVS stores (not check-outs). | |
repo_stops = ['.git', '.hg', 'CVS', 'CVSROOT', '.bzr', '.svn', '.sl'] | |
bundle_exts = ['.fossil'] | |
dir_exts = ['.git'] | |
known_types = frozenset(set(repo_stops) | set(bundle_exts) | set(dir_exts)) | |
rctrls = parser.add_argument_group('Recursion Controls') | |
vcstps = parser.add_argument_group('VCS Type Controls') | |
skpctl = parser.add_argument_group('Repo Skip Controls') | |
report = parser.add_argument_group('Reporting Controls') | |
qvloud = report.add_mutually_exclusive_group() | |
rctrls.add_argument('-x', '--exclude-path', '--exclude', # --exclude included here for backwards compatibility | |
action='append', dest='exclude_paths', default=[], metavar='PREFIX', | |
help='skip anything under one of these prefices') | |
rctrls.add_argument('-X', '--exclude-dir', | |
action='append', dest='exclude_dirs', default=[], metavar='DIR', | |
help='skip looking inside any dir named for one of these') | |
rctrls.add_argument('-i', '--ignore-paths', | |
action='append', dest='ignore_paths', default=[], metavar='PREFIX', | |
help='ignore these paths being repos, look inside them') | |
vcstps.add_argument('-o', '--only-repo-type', | |
action='append', dest='only_repo_types', metavar='TYPE+', | |
choices=known_types, default=[], | |
help='only print for these repo dirs (available: %(choices)s) (default: %(default)s)') | |
vcstps.add_argument('-r', '--repo-dir', | |
action='append', dest='repo_dirs', default=repo_stops, metavar='DIR', | |
help='Add to list of repo dirs (default %(default)s)') | |
vcstps.add_argument('-f', '--bundle-ext', | |
action='append', dest='bundle_exts', default=bundle_exts, metavar='EXT', | |
help='Add to list of repo file extensions (default %(default)s)') | |
vcstps.add_argument('-d', '--dir-ext', | |
action='append', dest='dir_exts', default=dir_exts, metavar='EXT', | |
help='Add to list of repo dirname extensions (default %(default)s)') | |
rctrls.add_argument('--symlinks-fd', | |
type=int, default=None, metavar='FD', | |
help='FD to write encountered symlinks to') | |
rctrls.add_argument('-L', '--symlinks-follow', # -L chosen to match BSD convention for following all symlinks | |
action='store_true', default=False, | |
help='Follow symlinks to find repos') | |
rctrls.add_argument('--warn-no-flagfiles', | |
action='store_true', default=False, | |
help='Outside repos, warn if no control flag-files are seen (probably want --obey-skips too)') | |
rctrls.add_argument('--obey-skips', | |
action='store_true', default=False, | |
help='Obey .skip-children files') | |
skpctl.add_argument('--skip-archived', '--no-archived', '-A', | |
action='store_true', default=False, | |
help='Skip repos we believe are "archived"') | |
skpctl.add_argument('--require-repo-bool-true', | |
action='append', metavar='GitCfgItem', default=[], | |
help='Only report repos with this git config bool true') | |
skpctl.add_argument('--require-repo-bool-false', | |
action='append', metavar='GitCfgItem', default=[], | |
help='Only report repos with this git config bool explicitly false') | |
skpctl.add_argument('--require-repo-bool-not-true', | |
action='append', metavar='GitCfgItem', default=[], | |
help='Only report repos with this git config bool false-or-absent') | |
skpctl.add_argument('-F', '--meta-file', | |
type=str, default=None, metavar='FN', | |
help='filename relative to repo meta-dir to require to exist, to print') | |
skpctl.add_argument('-e', '--exists-file', | |
type=str, default=None, metavar='FN', | |
help='filename relative to repo working tree to require to exist, to print, including file') | |
skpctl.add_argument('-E', '--exists-pattern', | |
type=str, default=None, metavar='GLOB', | |
help='Pattern to match inside a repo to repo; if no * or ? present, \'**/\' is implicitly preficed') | |
qvloud.add_argument('-q', '--quiet', | |
action='store_true', default=False, | |
help='Do not report repos') | |
qvloud.add_argument('-v', '--verbose', | |
action='store_true', dest='verbose', default=False, | |
help='Be more verbose (see repo type too)') | |
report.add_argument('-p', '--print-file', | |
action='store_true', default=False, | |
help='make -F behave like -e and print the path to the file') | |
report.add_argument('--external-file', | |
action='append', metavar='FN', default=[], | |
help='Report these files found outside of repos') | |
report.add_argument('-c', '--count', | |
action='store_true', dest='want_count', default=False, | |
help='Print a count to stderr at the end') | |
report.add_argument('-0', '--null-terminate', '--null', | |
action='store_true', default=False, | |
help='End each record with ASCII NUL instead of NL') | |
parser.add_argument('--git-cmd', | |
type=str, default="git", help=argparse.SUPPRESS) | |
parser.add_argument('top_dirs', | |
type=str, nargs='*', metavar='DIR', | |
help='top level directories') | |
options = parser.parse_args(args=args) | |
if options.top_dirs: | |
found_any = False | |
for d in options.top_dirs: | |
if pathlib.Path(d).is_dir(): | |
found_any = True | |
elif pathlib.Path(d).exists(): | |
print(f'{argv0}: warning: command-line arg {d!r} exists but is not a directory', file=sys.stderr) | |
else: | |
print(f'{argv0}: warning: command-line arg {d!r} does not exist', file=sys.stderr) | |
if not found_any: | |
print(f'{argv0}: did you mean to use -e to specify a pattern relative to the repo?', file=sys.stderr) | |
else: | |
options.top_dirs = ['.'] | |
if options.exclude_dirs: | |
bad = list(filter(lambda d: os.path.sep in d, options.exclude_dirs)) | |
if bad: | |
raise Exit(f'directory separator found in an \'--exclude-dir\' items {bad!r}, will never match', status=Exit.USAGE) | |
unknown = set(options.only_repo_types) - known_types | |
if unknown: | |
display_form = ' '.join(sorted(unknown)) | |
raise Exit(f'unknown repo types to limit to: {display_form}') | |
walker = Walker(options) | |
for top in options.top_dirs: | |
walker.find_under(top) | |
if options.want_count: | |
print(f'{argv0}: saw {walker.shown_count} repos', file=sys.stderr) | |
return 0 | |
if __name__ == '__main__': | |
argv0 = (lambda n: n[:-3] if len(n) >=4 and n.endswith('.py') else n)(sys.argv[0].rsplit('/')[-1]) | |
try: | |
rv = _main(sys.argv[1:], argv0=argv0) | |
except Exit as e: | |
for arg in e.args: | |
print(f'{argv0}: {arg}', file=sys.stderr) | |
sys.exit(e.status) | |
except KeyboardInterrupt: | |
print(f'\n{argv0}: KeyboardInterrupt', file=sys.stderr) | |
# 128 + SIGINT (2, per POSIX) | |
# This is not "right", as we're not exiting on a signal, but from the perspective of a shell, it will be "roughly right" | |
sys.exit(130) | |
sys.exit(rv) | |
# vim: set ft=python sw=4 expandtab : |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment