Skip to content

Instantly share code, notes, and snippets.

@Leedehai
Last active July 11, 2020 17:02
Show Gist options
  • Save Leedehai/96ae9d03d3d34ec74d59f33087ba5e9f to your computer and use it in GitHub Desktop.
Save Leedehai/96ae9d03d3d34ec74d59f33087ba5e9f to your computer and use it in GitHub Desktop.
Cross repository check.
#!/usr/bin/env python3
# https://gist.github.com/Leedehai/96ae9d03d3d34ec74d59f33087ba5e9f
import os, sys
import asyncio
import hashlib
import subprocess
from collections import OrderedDict
from pathlib import Path
from typing import List, Tuple, Union
THIS_DIR = os.path.dirname(__file__)
GIT = "git"
CHECKED_REPOS = sorted([
"arcl",
"auto3",
"gotap",
"protobus",
])
# key: str - path relative to repo
# val: boolean - whether it must exist in all repo,
# or List - repos that must have it
CHECKED_FILES = OrderedDict([ # under repo
(".vscode/settings.json", True),
(".clang-format", [ "arcl", "protobus" ]),
(".editorconfig", [ "gotap" ]),
(".gitignore", True),
(".pylintrc", True),
(".style.yapf", True),
("LICENSE.txt", True),
("auto.py", [ "arcl", "protobus" ]),
("deps/.gitignore", [ "arcl", "auto3", "protobus" ]),
("deps/README.md", [ "arcl", "auto3", "protobus" ]),
("deps/deps.py", [ "arcl", "auto3", "protobus" ]),
("deps/schema.py", [ "arcl", "protobus" ]),
("zen/README.md", False),
("zen/licenses/README.md", [ "arcl", "protobus" ]),
("zen/logo/canvas.html", [ "arcl", "protobus" ]),
])
def print_ok(main_message, secondary_message=""):
print("\x1b[38;5;155mok: %s\x1b[0m \x1b[2m%s\x1b[0m" % (
main_message, secondary_message
))
def print_error(main_message, secondary_message=""):
print("\x1b[33m[x] %s\x1b[0m \x1b[2m%s\x1b[0m" % (
main_message, secondary_message
))
# handle nasty Python's str v.s. bytes v.s. unicode mess
def _ensure_str(s):
if type(s) == bytes:
return s.decode()
return s # no 'unicode' type in Python3
def _execute(program, args_string):
return subprocess.call([ program ] + args_string.split())
def _execute_get_output(
program: str, args_string: List[str]) -> Tuple[str, int]:
try:
# We do not handle process error - let it print out to the console
output = _ensure_str(subprocess.check_output(
[ program ] + args_string.split(), stderr=subprocess.STDOUT))
return output, 0
except subprocess.CalledProcessError as e:
return _ensure_str(e.output), e.returncode
async def _async_execute_get_output(
program: str, args_string: List[str]) -> Tuple[str, int]:
p = await asyncio.create_subprocess_exec(
program, *args_string.split(),
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await p.communicate()
if p.returncode == 0:
return _ensure_str(stdout), 0
else:
return _ensure_str(stdout) + _ensure_str(stderr), p.returncode
def _compute_file_hash(filepath: Path) -> str:
hash_obj = hashlib.sha1()
with open(filepath, 'rb') as f:
hash_obj.update(f.read())
return hash_obj.hexdigest()
CHECKER_LINE_RIGHTSHIFT = max([ len(k) for k in CHECKED_FILES ])
def _make_checker_line(
repos: list,
checked_repos: list,
occupied_spaces: int) -> str:
content_line = ""
for repo in CHECKED_REPOS:
if repo in checked_repos:
content_line += " ✓ "
else:
content_line += " "
return "%s%s" % (
" " * (CHECKER_LINE_RIGHTSHIFT - occupied_spaces), content_line)
def _compare_repos_for_one_file(
repos: list,
relative_filepath_str: str,
must_exist: Union[bool, list]) -> bool:
repos_having_file = []
repos_missing_necessary_file = [] # list of repos
hashstrs = {} # key: hash string, value: repos
for repo in repos:
filepath = Path(
THIS_DIR, repo, relative_filepath_str.replace('/', os.sep))
if filepath.is_dir():
raise RuntimeError("Is a directory: %s" % filepath)
if not filepath.is_file():
if (type(must_exist) == bool and must_exist) \
or (type(must_exist) == list and repo in must_exist):
repos_missing_necessary_file.append(repo)
continue
repos_having_file.append(repo)
hashstr = _compute_file_hash(filepath)
hashstrs.setdefault(hashstr, []).append(repo)
has_missing_necessary = len(repos_missing_necessary_file) > 0
has_diff = len(hashstrs) > 1
is_ok = (not has_missing_necessary) and (not has_diff)
if is_ok:
print_ok(relative_filepath_str, _make_checker_line(
repos, repos_having_file, len(relative_filepath_str)))
else:
print_error(relative_filepath_str, _make_checker_line(
repos, repos_having_file, len(relative_filepath_str)))
if has_missing_necessary:
print(" missing in: %s" % ", ".join(repos_missing_necessary_file))
if has_diff:
print(" diff exists:")
for hashstr in hashstrs:
corresponding_repos = hashstrs[hashstr]
print(" %s: %s" % (
hashstr[:7], ", ".join(corresponding_repos)))
return is_ok
# Check the shared files have the same content
def check_shared_files() -> bool:
print("\n===== Check shared files: same content =====")
error_count = 0
print((6 + CHECKER_LINE_RIGHTSHIFT) * " "
+ "\x1b[2m%s\x1b[0m" % " ".join([
(("%-4s" % e) if len(e) <= 4 else (e[:3] + ".")) for e in CHECKED_REPOS]))
for relative_filepath_str in sorted(CHECKED_FILES):
res = _compare_repos_for_one_file(
CHECKED_REPOS, relative_filepath_str,
CHECKED_FILES[relative_filepath_str])
if res == False:
error_count += 1
print("Error count: %d" % error_count)
return error_count == 0
async def check_one_repo_master(repo_name: str, repo_path: Path) -> bool:
git_status, _ = _execute_get_output(
GIT, f"-C {repo_path} status --porcelain")
dirty_item_count = len([
l for l in git_status.split('\n') if len(l.strip()) ])
if dirty_item_count != 0:
print_error("%s: dirty" % repo_name)
print(" uncommitted: %d" % dirty_item_count)
return False
git_show, _ = _execute_get_output(
GIT, f"--no-pager -C {repo_path} show --no-patch --format=%H master")
local_master_sha = git_show.strip()
remote_master_sha = "N/A"
git_ls_remote, ret = await _async_execute_get_output(
GIT, f"--no-pager -C {repo_path} ls-remote --heads")
if ret != 0:
remote_master_sha = "\x1b[33;1m(network error)\x1b[0m"
else:
for line in [ l for l in git_ls_remote.split('\n') if ('\t' in l) ]:
line_split = line.split()
if line_split[-1] == "refs/heads/master":
remote_master_sha = line_split[0]
break
if local_master_sha != remote_master_sha:
print_error(
"%s: master branch: local != remote" % repo_name,
"didn't check which is newer")
print(" local: %s" % local_master_sha)
print(" remote: %s" % remote_master_sha)
return False
print_ok("%s: clean and synced with remote" % repo_name)
return True
# Check repositories' master branches are all committed, and are all synced
# with remote (it does NOT check which end is newer if out-of-sync)
async def check_repo_master_committed_and_pushed() -> bool:
print("\n===== Check repositories: master branch tips =====")
error_count = 0
coroutines = []
for repo_name in CHECKED_REPOS:
repo_path = Path(THIS_DIR, repo_name)
coroutines.append(check_one_repo_master(repo_name, repo_path))
res = await asyncio.gather(*coroutines)
error_count = sum([e == False for e in res])
print("Error count: %d" % error_count)
return error_count == 0
def main() -> bool:
print("Check in repos:\n %s" % ", ".join(CHECKED_REPOS))
for repo_name in CHECKED_REPOS:
repo_path = Path(THIS_DIR, repo_name)
if not repo_path.is_dir():
sys.exit("[Error] repository not found: %s" % repo_path)
err_cnt = 0
err_cnt += 0 if check_shared_files() else 1
err_cnt += 0 if asyncio.run(check_repo_master_committed_and_pushed()) else 1
if err_cnt == 0:
comment = "No error detected"
else:
comment = "Error detected: %d, see above" % err_cnt
print("\n%s." % comment)
return 0 if err_cnt == 0 else 1
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment