Last active
December 4, 2024 07:37
-
-
Save phuong/964857107076999141747db2d377dbe4 to your computer and use it in GitHub Desktop.
Generate the report for the deca-apps
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Generate the current release report for the current branch, this script is currently working with prefix DF-XXX, VS-XXX, TK- | |
| What does it do: | |
| - Detect the different code state between release branch and main app branch (for example form-admin/main) | |
| - Detect the different env settings | |
| - Group the PR by task id and group the task type (bump, test, fix, feat...) | |
| - Generate the report and link to the Jira board | |
| Usage: | |
| python scripts/report.loc.py form-admin,form-client | |
| """ | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| MASTER_BRANCH = "form-client/main" | |
| ORG_NAME = "resola-ai" | |
| CURRENT_BRANCH = "" | |
| CURRENT_REPO_NAME = "" | |
| CURRENT_APPS = [] | |
| PR_PATTERN = r"(\((#[0-9]+)\))" | |
| ISSUE_PATTERN = r"((DF|VS)-[0-9]+)" | |
| REPORT_TEMPLATE = """ | |
| # __current_apps__ | |
| - Features: __total_features__ pr(s) | |
| - Fixes: __total_fixes__ pr(s) | |
| - Tests: __total_tests__ pr(s) | |
| - Bumps: __total_bumps__ pr(s) | |
| - Others: __total_others__ pr(s) | |
| - First commit: [#__commit__](https://github.com/__organization_name__/__current_repo__/commit/__commit__) | |
| - Latest pull request: [#__pr_id__](https://github.com/__organization_name__/__current_repo__/pull/__pr_id__) | |
| ### Features: | |
| __features__ | |
| ### Fixes: | |
| __fixes__ | |
| ### Tests: | |
| __tests__ | |
| ### Others: | |
| __others__ | |
| ### Bumps: | |
| __bumps__ | |
| ## Configurations | |
| __settings__ | |
| """ | |
| def _print(*args): | |
| """ | |
| Simulate the print function, there's always a reason to do this | |
| """ | |
| print(" ".join([str(item) for item in args])) | |
| def get_raw_commit_data(commit: str): | |
| """ | |
| Generate log file, load data and looking for the | |
| """ | |
| tmp_file_name = f"release.loc.log/{CURRENT_BRANCH.replace('/', '_')}.git.log" | |
| command = f"git log {MASTER_BRANCH}..{CURRENT_BRANCH} --oneline --decorate -n 5000 > %s" % tmp_file_name | |
| os.system(command) # nosec | |
| with open(tmp_file_name, "r") as file: | |
| _raw = file.read().split("\n") | |
| raw = [] | |
| found = False | |
| for line in _raw: | |
| if line.startswith(commit): | |
| found = True | |
| raw.append(line) | |
| if found: | |
| break | |
| if not found: | |
| raise Exception("Can not find the expected commit id") | |
| raw.reverse() | |
| return raw | |
| def parse_commit(message: str): | |
| """ | |
| Parse commit message to get the issue id, pr id and message | |
| :param message: | |
| :return: None if the PR is not in the CURRENT_APP | |
| :return: Dict with pr_id, issue_id, message, pr_ids | |
| """ | |
| pr_id = "" | |
| issue_id = "" | |
| found = False | |
| for app in CURRENT_APPS: | |
| _current_app = "(%s)" % app | |
| if _current_app in message: | |
| found = True | |
| break | |
| if not found: | |
| return None | |
| try: | |
| # Find the pull request pattern | |
| matches = re.finditer(PR_PATTERN, message, re.IGNORECASE) | |
| for _, match in enumerate(matches, start=1): | |
| pr_id = match.group(2) | |
| except IndexError: | |
| pass | |
| try: | |
| # Find the issue pattern | |
| matches = re.findall(ISSUE_PATTERN, message, re.IGNORECASE) # type: ignore | |
| if matches: | |
| issue_id = matches[0][0] | |
| except IndexError as exc: | |
| pass | |
| message = re.sub(r"\([^)]*\)", "", message) | |
| try: | |
| matches = re.match(r"(\w{9})\s(.*)", message) | |
| commit = matches.group(1) | |
| message = matches.group(2) | |
| except AttributeError: | |
| commit = "" | |
| message = "" | |
| message = message.replace("<", "").replace(">", "").replace("build:", "").strip() | |
| _commit = dict(pr_id=pr_id, commit_id=commit, issue_id=issue_id, message=message, pr_ids=[], commit_ids=[]) | |
| if pr_id: | |
| _commit["pr_ids"] = [pr_id] | |
| if commit: | |
| _commit["commit_ids"] = [commit] | |
| return _commit | |
| def parse_commit_log(commits): | |
| features_issue_id = {} | |
| fixes_issue_id = {} | |
| features = [] | |
| fixes = [] | |
| others = [] | |
| bumps = [] | |
| tests = [] | |
| total_features = 0 | |
| total_fixes = 0 | |
| total_bumps = 0 | |
| total_tests = 0 | |
| latest_commit = None | |
| for line in commits: | |
| commit = parse_commit(line) | |
| if not commit: | |
| continue | |
| latest_commit = commit | |
| commit_message = commit["message"] | |
| if not commit["issue_id"]: | |
| others.append(commit) | |
| continue | |
| if commit_message.startswith("Revert"): | |
| continue | |
| if commit_message.startswith("Merge pull request"): | |
| continue | |
| elif commit_message.startswith("feat"): | |
| total_features += 1 | |
| if commit["issue_id"]: | |
| if commit["issue_id"] not in features_issue_id: | |
| # Only get the first one | |
| features.append(commit) | |
| features_issue_id[commit["issue_id"]] = commit | |
| else: | |
| if commit["pr_id"]: | |
| features_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"]) | |
| features_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"]) | |
| else: | |
| others.append(commit) | |
| else: | |
| others.append(commit) | |
| elif commit_message.startswith("test"): | |
| total_tests += 1 | |
| if commit["issue_id"] in features_issue_id: | |
| # Test for features, skip it | |
| features_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"]) | |
| features_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"]) | |
| else: | |
| tests.append(commit) | |
| elif commit_message.startswith("fix"): | |
| total_fixes += 1 | |
| if commit["issue_id"] in features_issue_id: | |
| # Fix for current feature, skip this | |
| # Group pr id of a feature | |
| features_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"]) | |
| features_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"]) | |
| else: | |
| if commit["issue_id"] in fixes_issue_id: | |
| # Group pr id of a feature | |
| fixes_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"]) | |
| fixes_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"]) | |
| else: | |
| fixes_issue_id[commit["issue_id"]] = commit | |
| fixes.append(commit) | |
| elif commit_message.startswith("Bump") or commit_message.startswith("bump"): | |
| total_bumps += 1 | |
| bumps.append(commit) | |
| else: | |
| if commit["issue_id"] in features_issue_id: | |
| # Chore for feature, skip it | |
| continue | |
| else: | |
| others.append(commit) | |
| result = dict( | |
| features=features, | |
| fixes=fixes, | |
| bumps=bumps, | |
| others=others, | |
| tests=tests, | |
| total_features=total_features, | |
| total_fixes=total_fixes, | |
| total_bumps=total_bumps, | |
| total_others=len(others), | |
| total_tests=total_tests, | |
| latest_commit=latest_commit["commit_id"].replace("#", "") if latest_commit else "", | |
| ) | |
| return result | |
| def clean_message(message: str) -> str: | |
| ls = re.split(ISSUE_PATTERN, message) | |
| _message = ls[len(ls) - 1].strip() | |
| if _message.startswith("]"): | |
| _message = _message[1:].strip() | |
| return _message.capitalize() | |
| def generate_commit_report(logs, commit: str) -> str: | |
| report = REPORT_TEMPLATE | |
| for key, value in logs.items(): | |
| if isinstance(value, list): | |
| # Log list | |
| reports = [] | |
| for item in value: | |
| issue_id = item["issue_id"] | |
| values = (issue_id, issue_id) | |
| item["message"] = clean_message(item["message"]) | |
| if issue_id: | |
| _base_url = 'https://resola.atlassian.net/browse' if issue_id.startswith("TK") else 'https://notion.so/resola' | |
| item["issue_link"] = f"[{values[0]}]({_base_url}/{values[1]})" | |
| else: | |
| item["issue_link"] = "" | |
| if item["pr_ids"]: | |
| # Build the PR links | |
| for k, v in enumerate(item["pr_ids"]): | |
| if not v: | |
| continue | |
| item["pr_ids"][k] = "[%s](https://github.com/%s/%s/pull/%s)" % ( | |
| v, | |
| ORG_NAME, | |
| CURRENT_REPO_NAME, | |
| v.replace("#", ""), | |
| ) | |
| item["pr_ids"] = ", ".join(item["pr_ids"]) | |
| if item["commit_ids"]: | |
| # Build the commit id link, in case the pr id is not available | |
| for k, v in enumerate(item["commit_ids"]): | |
| if not v: | |
| continue | |
| item["commit_ids"][k] = "[%s](https://github.com/%s/%s/commit/%s)" % ( | |
| v, | |
| ORG_NAME, | |
| CURRENT_REPO_NAME, | |
| v.replace("#", ""), | |
| ) | |
| item["commit_ids"] = ", ".join(item["commit_ids"]) | |
| if item["pr_id"]: | |
| line = " ".join("- {issue_link} {message} ({pr_ids})".format(**item).split()) | |
| else: | |
| line = " ".join("- {issue_link} {message} ({commit_ids})".format(**item).split()) | |
| reports.append(line) | |
| report_text = "\n".join(reports) | |
| else: | |
| # Other type of data | |
| report_text = str(value) | |
| report = report.replace(f"__{key}__", report_text) | |
| report = report.replace("__commit__", commit) | |
| report = report.replace("__current_apps__", ", ".join(CURRENT_APPS)) | |
| report = report.replace("__current_repo__", CURRENT_REPO_NAME) | |
| report = report.replace("__organization_name__", ORG_NAME) | |
| return report | |
| def get_raw_settings_change(app): | |
| tmp_file_name = f"release.loc.log/{CURRENT_BRANCH.replace('/', '_')}.settings.log" | |
| os.system(f"git diff {MASTER_BRANCH}..{CURRENT_BRANCH} -- apps/{app}/.env.example > %s" % tmp_file_name) # nosec | |
| with open(tmp_file_name, "r") as file: | |
| _raw = file.read().split("\n") | |
| added = [] | |
| removed = [] | |
| for line in _raw: | |
| if line.startswith("--") or line.startswith("++") or line.startswith("+# ") or line.startswith("-# "): | |
| continue | |
| if line.startswith("-"): | |
| line = line.replace("-", "", 1) | |
| if not line: | |
| continue | |
| removed.append(line) | |
| if line.startswith("+"): | |
| line = line.replace("+", "", 1) | |
| if not line: | |
| continue | |
| added.append(line) | |
| return added, removed | |
| def generate_settings_change(report: str) -> str: | |
| report_text = "" | |
| for app in CURRENT_APPS: | |
| added, removed = get_raw_settings_change(app) | |
| report_text += "\n### %s\n\n" % app | |
| if not added and not removed: | |
| report_text += "- N/A" | |
| else: | |
| if added: | |
| report_text += "\n### Added: \n\n" | |
| for line in added: | |
| report_text += "- [ ] `%s`\n" % line | |
| if removed: | |
| report_text += "\n### Removed: \n\n" | |
| for line in removed: | |
| report_text += "- [ ] `%s`\n" % line | |
| report_text += "\n" | |
| report = report.replace("__settings__", report_text) | |
| return report | |
| def main(): | |
| _print(f"Current branch:{CURRENT_BRANCH}") | |
| _print(f"Current repo name: {CURRENT_REPO_NAME}") | |
| _print(f"Current app: {CURRENT_APPS}") | |
| command = f"git log --oneline --cherry-pick --right-only --no-merges {MASTER_BRANCH}...{CURRENT_BRANCH}" | |
| output = subprocess.check_output(command.split(" ")).decode("utf-8") | |
| commit = output.strip().split("\n")[-1].split(" ")[0] | |
| _print("First commit id:", commit) | |
| log_dir = "release.loc.log" | |
| if not os.path.isdir(log_dir): | |
| os.mkdir(log_dir) | |
| tmp_file_name = f"{log_dir}/{CURRENT_BRANCH.replace('/', '_')}.report.log" | |
| try: | |
| raw = get_raw_commit_data(commit) | |
| separated_log = parse_commit_log(raw) | |
| report = generate_commit_report(separated_log, commit) | |
| report = generate_settings_change(report) | |
| with open(tmp_file_name, "w") as file: | |
| file.write(report) | |
| _print("Generated report file for branch: %s\nSaved to: %s" % (CURRENT_BRANCH, tmp_file_name)) | |
| command = f"xclip -sel c < {tmp_file_name}" | |
| os.system(command) | |
| _print("Copied to clipboard") | |
| except Exception as e: | |
| raise | |
| if __name__ == "__main__": | |
| CURRENT_BRANCH = ( | |
| subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).decode("utf-8").replace("\n", "") # nosec | |
| ) | |
| CURRENT_REPO_NAME = ( | |
| subprocess.check_output(["git", "rev-parse", "--show-toplevel"]) # nosec | |
| .decode("utf-8") | |
| .replace("\n", "") | |
| .split("/")[-1] | |
| ) | |
| CURRENT_APPS = [item.strip() for item in sys.argv[1].split(",")] | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment