Skip to content

Instantly share code, notes, and snippets.

@phuong
Last active December 4, 2024 07:37
Show Gist options
  • Save phuong/964857107076999141747db2d377dbe4 to your computer and use it in GitHub Desktop.
Save phuong/964857107076999141747db2d377dbe4 to your computer and use it in GitHub Desktop.
Generate the report for the deca-apps
"""
Generate the current release report for the current branch, this script is currently working with prefix DF-XXX, VS-XXX, TK-
What does it do:
- Detect the different code state between release branch and main app branch (for example form-admin/main)
- Detect the different env settings
- Group the PR by task id and group the task type (bump, test, fix, feat...)
- Generate the report and link to the Jira board
Usage:
python scripts/report.loc.py form-admin,form-client
"""
import os
import re
import subprocess
import sys
MASTER_BRANCH = "form-client/main"
ORG_NAME = "resola-ai"
CURRENT_BRANCH = ""
CURRENT_REPO_NAME = ""
CURRENT_APPS = []
PR_PATTERN = r"(\((#[0-9]+)\))"
ISSUE_PATTERN = r"((DF|VS)-[0-9]+)"
REPORT_TEMPLATE = """
# __current_apps__
- Features: __total_features__ pr(s)
- Fixes: __total_fixes__ pr(s)
- Tests: __total_tests__ pr(s)
- Bumps: __total_bumps__ pr(s)
- Others: __total_others__ pr(s)
- First commit: [#__commit__](https://github.com/__organization_name__/__current_repo__/commit/__commit__)
- Latest pull request: [#__pr_id__](https://github.com/__organization_name__/__current_repo__/pull/__pr_id__)
### Features:
__features__
### Fixes:
__fixes__
### Tests:
__tests__
### Others:
__others__
### Bumps:
__bumps__
## Configurations
__settings__
"""
def _print(*args):
"""
Simulate the print function, there's always a reason to do this
"""
print(" ".join([str(item) for item in args]))
def get_raw_commit_data(commit: str):
"""
Generate log file, load data and looking for the
"""
tmp_file_name = f"release.loc.log/{CURRENT_BRANCH.replace('/', '_')}.git.log"
command = f"git log {MASTER_BRANCH}..{CURRENT_BRANCH} --oneline --decorate -n 5000 > %s" % tmp_file_name
os.system(command) # nosec
with open(tmp_file_name, "r") as file:
_raw = file.read().split("\n")
raw = []
found = False
for line in _raw:
if line.startswith(commit):
found = True
raw.append(line)
if found:
break
if not found:
raise Exception("Can not find the expected commit id")
raw.reverse()
return raw
def parse_commit(message: str):
"""
Parse commit message to get the issue id, pr id and message
:param message:
:return: None if the PR is not in the CURRENT_APP
:return: Dict with pr_id, issue_id, message, pr_ids
"""
pr_id = ""
issue_id = ""
found = False
for app in CURRENT_APPS:
_current_app = "(%s)" % app
if _current_app in message:
found = True
break
if not found:
return None
try:
# Find the pull request pattern
matches = re.finditer(PR_PATTERN, message, re.IGNORECASE)
for _, match in enumerate(matches, start=1):
pr_id = match.group(2)
except IndexError:
pass
try:
# Find the issue pattern
matches = re.findall(ISSUE_PATTERN, message, re.IGNORECASE) # type: ignore
if matches:
issue_id = matches[0][0]
except IndexError as exc:
pass
message = re.sub(r"\([^)]*\)", "", message)
try:
matches = re.match(r"(\w{9})\s(.*)", message)
commit = matches.group(1)
message = matches.group(2)
except AttributeError:
commit = ""
message = ""
message = message.replace("<", "").replace(">", "").replace("build:", "").strip()
_commit = dict(pr_id=pr_id, commit_id=commit, issue_id=issue_id, message=message, pr_ids=[], commit_ids=[])
if pr_id:
_commit["pr_ids"] = [pr_id]
if commit:
_commit["commit_ids"] = [commit]
return _commit
def parse_commit_log(commits):
features_issue_id = {}
fixes_issue_id = {}
features = []
fixes = []
others = []
bumps = []
tests = []
total_features = 0
total_fixes = 0
total_bumps = 0
total_tests = 0
latest_commit = None
for line in commits:
commit = parse_commit(line)
if not commit:
continue
latest_commit = commit
commit_message = commit["message"]
if not commit["issue_id"]:
others.append(commit)
continue
if commit_message.startswith("Revert"):
continue
if commit_message.startswith("Merge pull request"):
continue
elif commit_message.startswith("feat"):
total_features += 1
if commit["issue_id"]:
if commit["issue_id"] not in features_issue_id:
# Only get the first one
features.append(commit)
features_issue_id[commit["issue_id"]] = commit
else:
if commit["pr_id"]:
features_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"])
features_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"])
else:
others.append(commit)
else:
others.append(commit)
elif commit_message.startswith("test"):
total_tests += 1
if commit["issue_id"] in features_issue_id:
# Test for features, skip it
features_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"])
features_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"])
else:
tests.append(commit)
elif commit_message.startswith("fix"):
total_fixes += 1
if commit["issue_id"] in features_issue_id:
# Fix for current feature, skip this
# Group pr id of a feature
features_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"])
features_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"])
else:
if commit["issue_id"] in fixes_issue_id:
# Group pr id of a feature
fixes_issue_id[commit["issue_id"]]["pr_ids"].append(commit["pr_id"])
fixes_issue_id[commit["issue_id"]]["commit_ids"].append(commit["commit_id"])
else:
fixes_issue_id[commit["issue_id"]] = commit
fixes.append(commit)
elif commit_message.startswith("Bump") or commit_message.startswith("bump"):
total_bumps += 1
bumps.append(commit)
else:
if commit["issue_id"] in features_issue_id:
# Chore for feature, skip it
continue
else:
others.append(commit)
result = dict(
features=features,
fixes=fixes,
bumps=bumps,
others=others,
tests=tests,
total_features=total_features,
total_fixes=total_fixes,
total_bumps=total_bumps,
total_others=len(others),
total_tests=total_tests,
latest_commit=latest_commit["commit_id"].replace("#", "") if latest_commit else "",
)
return result
def clean_message(message: str) -> str:
ls = re.split(ISSUE_PATTERN, message)
_message = ls[len(ls) - 1].strip()
if _message.startswith("]"):
_message = _message[1:].strip()
return _message.capitalize()
def generate_commit_report(logs, commit: str) -> str:
report = REPORT_TEMPLATE
for key, value in logs.items():
if isinstance(value, list):
# Log list
reports = []
for item in value:
issue_id = item["issue_id"]
values = (issue_id, issue_id)
item["message"] = clean_message(item["message"])
if issue_id:
_base_url = 'https://resola.atlassian.net/browse' if issue_id.startswith("TK") else 'https://notion.so/resola'
item["issue_link"] = f"[{values[0]}]({_base_url}/{values[1]})"
else:
item["issue_link"] = ""
if item["pr_ids"]:
# Build the PR links
for k, v in enumerate(item["pr_ids"]):
if not v:
continue
item["pr_ids"][k] = "[%s](https://github.com/%s/%s/pull/%s)" % (
v,
ORG_NAME,
CURRENT_REPO_NAME,
v.replace("#", ""),
)
item["pr_ids"] = ", ".join(item["pr_ids"])
if item["commit_ids"]:
# Build the commit id link, in case the pr id is not available
for k, v in enumerate(item["commit_ids"]):
if not v:
continue
item["commit_ids"][k] = "[%s](https://github.com/%s/%s/commit/%s)" % (
v,
ORG_NAME,
CURRENT_REPO_NAME,
v.replace("#", ""),
)
item["commit_ids"] = ", ".join(item["commit_ids"])
if item["pr_id"]:
line = " ".join("- {issue_link} {message} ({pr_ids})".format(**item).split())
else:
line = " ".join("- {issue_link} {message} ({commit_ids})".format(**item).split())
reports.append(line)
report_text = "\n".join(reports)
else:
# Other type of data
report_text = str(value)
report = report.replace(f"__{key}__", report_text)
report = report.replace("__commit__", commit)
report = report.replace("__current_apps__", ", ".join(CURRENT_APPS))
report = report.replace("__current_repo__", CURRENT_REPO_NAME)
report = report.replace("__organization_name__", ORG_NAME)
return report
def get_raw_settings_change(app):
tmp_file_name = f"release.loc.log/{CURRENT_BRANCH.replace('/', '_')}.settings.log"
os.system(f"git diff {MASTER_BRANCH}..{CURRENT_BRANCH} -- apps/{app}/.env.example > %s" % tmp_file_name) # nosec
with open(tmp_file_name, "r") as file:
_raw = file.read().split("\n")
added = []
removed = []
for line in _raw:
if line.startswith("--") or line.startswith("++") or line.startswith("+# ") or line.startswith("-# "):
continue
if line.startswith("-"):
line = line.replace("-", "", 1)
if not line:
continue
removed.append(line)
if line.startswith("+"):
line = line.replace("+", "", 1)
if not line:
continue
added.append(line)
return added, removed
def generate_settings_change(report: str) -> str:
report_text = ""
for app in CURRENT_APPS:
added, removed = get_raw_settings_change(app)
report_text += "\n### %s\n\n" % app
if not added and not removed:
report_text += "- N/A"
else:
if added:
report_text += "\n### Added: \n\n"
for line in added:
report_text += "- [ ] `%s`\n" % line
if removed:
report_text += "\n### Removed: \n\n"
for line in removed:
report_text += "- [ ] `%s`\n" % line
report_text += "\n"
report = report.replace("__settings__", report_text)
return report
def main():
_print(f"Current branch:{CURRENT_BRANCH}")
_print(f"Current repo name: {CURRENT_REPO_NAME}")
_print(f"Current app: {CURRENT_APPS}")
command = f"git log --oneline --cherry-pick --right-only --no-merges {MASTER_BRANCH}...{CURRENT_BRANCH}"
output = subprocess.check_output(command.split(" ")).decode("utf-8")
commit = output.strip().split("\n")[-1].split(" ")[0]
_print("First commit id:", commit)
log_dir = "release.loc.log"
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
tmp_file_name = f"{log_dir}/{CURRENT_BRANCH.replace('/', '_')}.report.log"
try:
raw = get_raw_commit_data(commit)
separated_log = parse_commit_log(raw)
report = generate_commit_report(separated_log, commit)
report = generate_settings_change(report)
with open(tmp_file_name, "w") as file:
file.write(report)
_print("Generated report file for branch: %s\nSaved to: %s" % (CURRENT_BRANCH, tmp_file_name))
command = f"xclip -sel c < {tmp_file_name}"
os.system(command)
_print("Copied to clipboard")
except Exception as e:
raise
if __name__ == "__main__":
CURRENT_BRANCH = (
subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).decode("utf-8").replace("\n", "") # nosec
)
CURRENT_REPO_NAME = (
subprocess.check_output(["git", "rev-parse", "--show-toplevel"]) # nosec
.decode("utf-8")
.replace("\n", "")
.split("/")[-1]
)
CURRENT_APPS = [item.strip() for item in sys.argv[1].split(",")]
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment