Last active
July 11, 2020 17:02
-
-
Save Leedehai/96ae9d03d3d34ec74d59f33087ba5e9f to your computer and use it in GitHub Desktop.
Cross repository check.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# https://gist.github.com/Leedehai/96ae9d03d3d34ec74d59f33087ba5e9f | |
import os, sys | |
import asyncio | |
import hashlib | |
import subprocess | |
from collections import OrderedDict | |
from pathlib import Path | |
from typing import List, Tuple, Union | |
THIS_DIR = os.path.dirname(__file__) | |
GIT = "git" | |
CHECKED_REPOS = sorted([ | |
"arcl", | |
"auto3", | |
"gotap", | |
"protobus", | |
]) | |
# key: str - path relative to repo | |
# val: boolean - whether it must exist in all repo, | |
# or List - repos that must have it | |
CHECKED_FILES = OrderedDict([ # under repo | |
(".vscode/settings.json", True), | |
(".clang-format", [ "arcl", "protobus" ]), | |
(".editorconfig", [ "gotap" ]), | |
(".gitignore", True), | |
(".pylintrc", True), | |
(".style.yapf", True), | |
("LICENSE.txt", True), | |
("auto.py", [ "arcl", "protobus" ]), | |
("deps/.gitignore", [ "arcl", "auto3", "protobus" ]), | |
("deps/README.md", [ "arcl", "auto3", "protobus" ]), | |
("deps/deps.py", [ "arcl", "auto3", "protobus" ]), | |
("deps/schema.py", [ "arcl", "protobus" ]), | |
("zen/README.md", False), | |
("zen/licenses/README.md", [ "arcl", "protobus" ]), | |
("zen/logo/canvas.html", [ "arcl", "protobus" ]), | |
]) | |
def print_ok(main_message, secondary_message=""): | |
print("\x1b[38;5;155mok: %s\x1b[0m \x1b[2m%s\x1b[0m" % ( | |
main_message, secondary_message | |
)) | |
def print_error(main_message, secondary_message=""): | |
print("\x1b[33m[x] %s\x1b[0m \x1b[2m%s\x1b[0m" % ( | |
main_message, secondary_message | |
)) | |
# handle nasty Python's str v.s. bytes v.s. unicode mess | |
def _ensure_str(s): | |
if type(s) == bytes: | |
return s.decode() | |
return s # no 'unicode' type in Python3 | |
def _execute(program, args_string): | |
return subprocess.call([ program ] + args_string.split()) | |
def _execute_get_output( | |
program: str, args_string: List[str]) -> Tuple[str, int]: | |
try: | |
# We do not handle process error - let it print out to the console | |
output = _ensure_str(subprocess.check_output( | |
[ program ] + args_string.split(), stderr=subprocess.STDOUT)) | |
return output, 0 | |
except subprocess.CalledProcessError as e: | |
return _ensure_str(e.output), e.returncode | |
async def _async_execute_get_output( | |
program: str, args_string: List[str]) -> Tuple[str, int]: | |
p = await asyncio.create_subprocess_exec( | |
program, *args_string.split(), | |
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) | |
stdout, stderr = await p.communicate() | |
if p.returncode == 0: | |
return _ensure_str(stdout), 0 | |
else: | |
return _ensure_str(stdout) + _ensure_str(stderr), p.returncode | |
def _compute_file_hash(filepath: Path) -> str: | |
hash_obj = hashlib.sha1() | |
with open(filepath, 'rb') as f: | |
hash_obj.update(f.read()) | |
return hash_obj.hexdigest() | |
CHECKER_LINE_RIGHTSHIFT = max([ len(k) for k in CHECKED_FILES ]) | |
def _make_checker_line( | |
repos: list, | |
checked_repos: list, | |
occupied_spaces: int) -> str: | |
content_line = "" | |
for repo in CHECKED_REPOS: | |
if repo in checked_repos: | |
content_line += " ✓ " | |
else: | |
content_line += " " | |
return "%s%s" % ( | |
" " * (CHECKER_LINE_RIGHTSHIFT - occupied_spaces), content_line) | |
def _compare_repos_for_one_file( | |
repos: list, | |
relative_filepath_str: str, | |
must_exist: Union[bool, list]) -> bool: | |
repos_having_file = [] | |
repos_missing_necessary_file = [] # list of repos | |
hashstrs = {} # key: hash string, value: repos | |
for repo in repos: | |
filepath = Path( | |
THIS_DIR, repo, relative_filepath_str.replace('/', os.sep)) | |
if filepath.is_dir(): | |
raise RuntimeError("Is a directory: %s" % filepath) | |
if not filepath.is_file(): | |
if (type(must_exist) == bool and must_exist) \ | |
or (type(must_exist) == list and repo in must_exist): | |
repos_missing_necessary_file.append(repo) | |
continue | |
repos_having_file.append(repo) | |
hashstr = _compute_file_hash(filepath) | |
hashstrs.setdefault(hashstr, []).append(repo) | |
has_missing_necessary = len(repos_missing_necessary_file) > 0 | |
has_diff = len(hashstrs) > 1 | |
is_ok = (not has_missing_necessary) and (not has_diff) | |
if is_ok: | |
print_ok(relative_filepath_str, _make_checker_line( | |
repos, repos_having_file, len(relative_filepath_str))) | |
else: | |
print_error(relative_filepath_str, _make_checker_line( | |
repos, repos_having_file, len(relative_filepath_str))) | |
if has_missing_necessary: | |
print(" missing in: %s" % ", ".join(repos_missing_necessary_file)) | |
if has_diff: | |
print(" diff exists:") | |
for hashstr in hashstrs: | |
corresponding_repos = hashstrs[hashstr] | |
print(" %s: %s" % ( | |
hashstr[:7], ", ".join(corresponding_repos))) | |
return is_ok | |
# Check the shared files have the same content | |
def check_shared_files() -> bool: | |
print("\n===== Check shared files: same content =====") | |
error_count = 0 | |
print((6 + CHECKER_LINE_RIGHTSHIFT) * " " | |
+ "\x1b[2m%s\x1b[0m" % " ".join([ | |
(("%-4s" % e) if len(e) <= 4 else (e[:3] + ".")) for e in CHECKED_REPOS])) | |
for relative_filepath_str in sorted(CHECKED_FILES): | |
res = _compare_repos_for_one_file( | |
CHECKED_REPOS, relative_filepath_str, | |
CHECKED_FILES[relative_filepath_str]) | |
if res == False: | |
error_count += 1 | |
print("Error count: %d" % error_count) | |
return error_count == 0 | |
async def check_one_repo_master(repo_name: str, repo_path: Path) -> bool: | |
git_status, _ = _execute_get_output( | |
GIT, f"-C {repo_path} status --porcelain") | |
dirty_item_count = len([ | |
l for l in git_status.split('\n') if len(l.strip()) ]) | |
if dirty_item_count != 0: | |
print_error("%s: dirty" % repo_name) | |
print(" uncommitted: %d" % dirty_item_count) | |
return False | |
git_show, _ = _execute_get_output( | |
GIT, f"--no-pager -C {repo_path} show --no-patch --format=%H master") | |
local_master_sha = git_show.strip() | |
remote_master_sha = "N/A" | |
git_ls_remote, ret = await _async_execute_get_output( | |
GIT, f"--no-pager -C {repo_path} ls-remote --heads") | |
if ret != 0: | |
remote_master_sha = "\x1b[33;1m(network error)\x1b[0m" | |
else: | |
for line in [ l for l in git_ls_remote.split('\n') if ('\t' in l) ]: | |
line_split = line.split() | |
if line_split[-1] == "refs/heads/master": | |
remote_master_sha = line_split[0] | |
break | |
if local_master_sha != remote_master_sha: | |
print_error( | |
"%s: master branch: local != remote" % repo_name, | |
"didn't check which is newer") | |
print(" local: %s" % local_master_sha) | |
print(" remote: %s" % remote_master_sha) | |
return False | |
print_ok("%s: clean and synced with remote" % repo_name) | |
return True | |
# Check repositories' master branches are all committed, and are all synced | |
# with remote (it does NOT check which end is newer if out-of-sync) | |
async def check_repo_master_committed_and_pushed() -> bool: | |
print("\n===== Check repositories: master branch tips =====") | |
error_count = 0 | |
coroutines = [] | |
for repo_name in CHECKED_REPOS: | |
repo_path = Path(THIS_DIR, repo_name) | |
coroutines.append(check_one_repo_master(repo_name, repo_path)) | |
res = await asyncio.gather(*coroutines) | |
error_count = sum([e == False for e in res]) | |
print("Error count: %d" % error_count) | |
return error_count == 0 | |
def main() -> bool: | |
print("Check in repos:\n %s" % ", ".join(CHECKED_REPOS)) | |
for repo_name in CHECKED_REPOS: | |
repo_path = Path(THIS_DIR, repo_name) | |
if not repo_path.is_dir(): | |
sys.exit("[Error] repository not found: %s" % repo_path) | |
err_cnt = 0 | |
err_cnt += 0 if check_shared_files() else 1 | |
err_cnt += 0 if asyncio.run(check_repo_master_committed_and_pushed()) else 1 | |
if err_cnt == 0: | |
comment = "No error detected" | |
else: | |
comment = "Error detected: %d, see above" % err_cnt | |
print("\n%s." % comment) | |
return 0 if err_cnt == 0 else 1 | |
if __name__ == "__main__": | |
try: | |
sys.exit(main()) | |
except KeyboardInterrupt: | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment