Last active
November 5, 2024 17:54
-
-
Save tigerhawkvok/b480daff38493cc6723a01fcec1cd0dd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!python3 | |
# https://stackoverflow.com/a/79108334/1877527 | |
# cSpell: words creatordate, extensionless | |
from typing import Optional, List, Collection | |
import subprocess | |
import datetime as dt | |
import re | |
COMMIT_TAG_REGEX = r"((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)" # cSpell: disable-line | |
DEFAULT_MERGE_COMMENT_PREFIX = r"merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*" + COMMIT_TAG_REGEX | |
__all__ = ["waterfallMerge"] | |
def waterfallMerge(fromBranch:str, message:Optional[str]= None, featureBranchLabel:Optional[str]= None, lastMergeID:Optional[str]= None, mergeCommentRegex:str= DEFAULT_MERGE_COMMENT_PREFIX, *, sign:bool= False, debug:bool = False, extensionlessBranches:Collection[str]= ("main", "master")): | |
""" | |
Perform a waterfall merge from one branch the current one. | |
The commit points last joined are determined by looking at the following pool: | |
- the most recent tag on the current branch that matches the name of the branch to merge from. If the current branch is in the list of `extensionlessBranches`, then the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from. For example, if we are on branch `tests` merging branch `dev` in, tag `v1.1.10-dev` would match tag `v1.1.10-test` on the current branch, and would be a candidate last-merge point. These tags should otherwise match semver rules. | |
- If the commit note message contains "merge commit: <commit>", then the referenced commit is a candidate last-merge point. The commit is the one that contains the merge note. The string searched for is defined by the `mergeCommentRegex` parameter, which defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*` + `COMMIT_TAG_REGEX`, where `COMMIT_TAG_REGEX` is a regex that matches commit hashes and semver tags. | |
- If the pool of above is empty, the merge base of the current branch and the branch to merge from is used as the last-merge point. | |
The most recent reference is then used to create a new branch, merge the last-merge point and the branch to merge from into it, and then squash-merge that branch back into the current branch. | |
Based on https://stackoverflow.com/a/79108334/1877527 | |
Parameters | |
---------- | |
fromBranch: str | |
The branch to merge from. | |
message: Optional[str] | |
The commit message for the merge. If not provided, a default message is used. | |
featureBranchLabel: Optional[str] | |
The feature label for the branch. If provided, does not complete the final squash-merge. | |
lastMergeID: Optional[str] | |
The last merge commit ID. If not provided, the above pool is used to find the most recent reference. | |
mergeCommentRegex: str | |
Regex to find merge comments. Defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)`. | |
sign: bool, kwarg-only (default False) | |
Sign the commit. | |
debug: bool, kwarg-only (default False) | |
Enable debug output. | |
extensionlessBranches: Collection[str], kwarg-only (default ["main", "master"]) | |
When searching tags on these branches, the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from. | |
""" | |
currentBranch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True).stdout.strip() | |
fromBranch = fromBranch.strip() | |
if currentBranch == fromBranch: | |
raise ValueError("Error: The current branch and the branch to merge from are the same.") | |
if debug: | |
print("Current branch:", currentBranch) | |
if isinstance(lastMergeID, str) and len(lastMergeID) > 0: | |
try: | |
commitHash = subprocess.run(["git", "rev-parse", lastMergeID], capture_output=True, text=True, check= True).stdout.strip() | |
except subprocess.CalledProcessError as e: | |
raise ValueError(f"Error: {lastMergeID} is not a valid commit hash.") from e | |
else: | |
print("Merging from commit:", commitHash) | |
else: | |
# Find the most recent tag name on the current branch | |
matched:List[str] = [] | |
tagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate", "--merged"], capture_output=True, text=True).stdout.strip().splitlines() | |
allTagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate"], capture_output=True, text=True).stdout.strip().splitlines() | |
if len(tagNames) > 0: | |
if currentBranch not in extensionlessBranches: | |
extensionless = False | |
hereTagNames = [tag for tag in tagNames if tag.endswith(currentBranch)] | |
else: | |
extensionless = True | |
hereTagNames = tagNames | |
if debug: | |
print("Current branch tags:", hereTagNames) | |
thereTagNames = [tag for tag in allTagNames if tag.endswith(fromBranch)] | |
if debug: | |
print("From branch tags:", thereTagNames) | |
print("Extensionless?", extensionless) | |
__transformedTags = [tag.replace(f"-{fromBranch}", "" if extensionless else f"-{currentBranch}") for tag in thereTagNames] | |
# if debug: | |
# print("Matching against", __transformedTags) | |
matchedTags = [tag for __transformed, tag in zip(__transformedTags, thereTagNames) if __transformed in hereTagNames] | |
if debug: | |
print("Found synced tags:", matchedTags) | |
matched += [subprocess.run(["git", "rev-parse", tag + r"^{}"], capture_output=True, text=True).stdout.strip() for tag in matchedTags] | |
if debug: | |
print("Matched commit hashes:", matched) | |
# Find ones with merge comments | |
if len(mergeCommentRegex) > 0: | |
reg = re.compile(mergeCommentRegex, re.IGNORECASE | re.MULTILINE) | |
mergeCommitMessages = subprocess.run(["git", "log", "--notes", "-i", r"--pretty=format:%H%n%s%b%N%n%n%n%n"], capture_output=True, text=True).stdout.strip().split("\n\n\n\n\n") | |
def _revHelper(rev:Optional[str]): | |
if not isinstance(rev, str): | |
return None | |
try: | |
return subprocess.run(["git", "rev-parse", rev], capture_output=True, text=True, check= True).stdout.strip() | |
except subprocess.CalledProcessError: | |
return None | |
reObjs = [reg.search(commitMessage) for commitMessage in mergeCommitMessages] | |
if debug: | |
print("Merge commit messages (last 5):", mergeCommitMessages[:5]) | |
print("Raw matches", reObjs[:5]) | |
# group 1 is the _last_ merge point, the commit containing it is the one we want now | |
__containingCommit = [msg.strip().splitlines()[0] for msg, _reMatch in zip(mergeCommitMessages, reObjs) if _reMatch is not None] | |
__mergePoint = [_reMatch.group(1) for _reMatch in reObjs if _reMatch is not None] | |
_messageMatches = [_revHelper(_c) for _c in __mergePoint] | |
if debug: | |
print("Matched messages", _messageMatches[:5]) | |
nn = [rev for rev in _messageMatches if rev is not None] | |
validRevs = [rev for rev in nn if rev not in matched] | |
if len(validRevs) > 0: | |
if debug: | |
print("Found new valid revs with commit messages matching regex:", validRevs) | |
matched += validRevs | |
elif debug: | |
if len(nn) > 0: | |
print("All matched revs are already in the list of matched revs.") | |
else: | |
print("No valid revs found with commit messages matching regex.") | |
if len(matched) == 0: | |
# Assume the last merge was the most recent | |
commitHash = subprocess.run(["git", "merge-base", currentBranch, fromBranch], capture_output=True, text=True).stdout.strip() | |
print("No merge points found, using last merge base:", commitHash) | |
raise ValueError("Error: No valid commit hashes found.") | |
else: | |
tz = dt.timezone(-dt.timedelta(hours=8)) | |
mostRecent = dt.datetime.min.replace(tzinfo= tz) | |
succeeded = False | |
for i, rev in enumerate(matched, 1): | |
args = ["git", "log", rev, "-n", "1", "--date=iso8601-strict"] | |
try: | |
_lines = subprocess.run(args, capture_output= True, text= True).stdout.strip().splitlines() | |
try: | |
ds = [__line for __line in _lines if __line.lower().startswith("date")][0] | |
except IndexError: | |
ds = _lines[2] | |
commitDateStr = ds.split(" ").pop() | |
commitDate = dt.datetime.fromisoformat(commitDateStr) | |
except Exception as e: | |
if debug: | |
print(f"Error getting commit date for revision #{i} of {len(matched)} = {rev}: {e}") | |
print(" ".join(args)) | |
if len(matched) == 1: | |
raise | |
else: | |
succeeded = True | |
if commitDate > mostRecent: | |
mostRecent = commitDate | |
commitHash = rev | |
if not succeeded: | |
raise ValueError("Error: No valid commit hashes found.") | |
print("Most recent reference:", commitHash) | |
# Now we have the most recent reference | |
if not isinstance(message, str) or len(message) == 0: | |
message = f"Merge `{fromBranch}` into `{currentBranch}`" | |
message += f"\n\nMerge from {fromBranch} commit: " + subprocess.run(["git", "log", "-n", "1", """--pretty=format:%H""", fromBranch], capture_output= True, text= True, check= True).stdout.strip() | |
if debug: | |
print("Using commit message:", message) | |
# Create a new branch | |
branchPrefix = "waterfallMerge" | |
if isinstance(featureBranchLabel, str) and len(featureBranchLabel) > 0: | |
branchPrefix = "feature-" + re.sub("[^a-z0-9]", "-", featureBranchLabel, 0, re.IGNORECASE | re.MULTILINE).strip("-") | |
ABORT_BEFORE_SQUASH = True | |
else: | |
ABORT_BEFORE_SQUASH = False | |
workingBranch = f"{branchPrefix}-{fromBranch}-into-{currentBranch}-{commitHash[:8]}" | |
if debug: | |
raise RuntimeError(f"Stopping before creating temporary branch `{workingBranch}` and subsequent merge.") | |
subprocess.run(["git", "switch", "-c", workingBranch], check= True, stdout= subprocess.PIPE) | |
# Merge in the last merge point | |
try: | |
subprocess.run(["git", "merge", commitHash, "--no-edit"], check= True, stdout= subprocess.PIPE) | |
subprocess.run(["git", "merge", fromBranch, "--no-edit"], check= True, stdout= subprocess.PIPE) | |
except Exception as e: | |
subprocess.run(["git", "merge", "--abort"], check= True, stdout= subprocess.PIPE) | |
raise RuntimeError(f"Merge failed -- you're left on the temporary branch {workingBranch} with the aborted merge state. You originated from {currentBranch}.") from e | |
if ABORT_BEFORE_SQUASH: | |
print(f"Stopping before switching branches back and squashing temporary branch `{workingBranch}` into `{currentBranch}`. You're still on `{workingBranch}`.") | |
return | |
subprocess.run(["git", "switch", currentBranch], check= True, stdout= subprocess.PIPE) | |
subprocess.run(["git", "merge", "--squash", workingBranch], check= True, stdout= subprocess.PIPE) | |
commitArgs = ["git", "commit"] | |
if sign: | |
commitArgs.append("-S") | |
commitArgs += ["-m", message] | |
subprocess.run(commitArgs, check= True, stdout= subprocess.PIPE) | |
# Clean up | |
subprocess.run(["git", "branch", "-D", workingBranch], check= True, stdout= subprocess.PIPE) | |
if debug: | |
print(f"Merged successfully and removed temporary merge branch {workingBranch}.") | |
if __name__ == "__main__": | |
# Read the two arguments provided on the command line | |
import argparse | |
parser = argparse.ArgumentParser(description= "Perform a waterfall merge from one branch to another.") | |
parser.add_argument("fromBranch", type= str, help= "The branch to merge from.") | |
parser.add_argument("-b", "--branch-label", type= str, default= None, help="The feature label for the branch. If provided, does not complete the final squash-merge.") | |
parser.add_argument("lastMergeID", type= str, nargs= '?', default =None, help= "The last merge commit ID (optional).") | |
parser.add_argument("-m", "--message", type= str, default= None, help="The commit message for the merge.") | |
parser.add_argument("--mergeCommentRegex", type= str, default= DEFAULT_MERGE_COMMENT_PREFIX, help= "Regex to find merge comments.") | |
parser.add_argument("-s", "--sign", action= "store_true", help= "Sign the commit.") | |
parser.add_argument("--debug", action= "store_true", help= "Enable debug output.") | |
parser.add_argument("--extensionlessBranches", type= str, nargs= '*', default= ["main", "master"], help= "Branches that are considered extensionless.") | |
args = parser.parse_args() | |
waterfallMerge( | |
fromBranch= args.fromBranch, | |
message= args.message, | |
lastMergeID= args.lastMergeID, | |
mergeCommentRegex= args.mergeCommentRegex, | |
sign= args.sign, | |
debug= args.debug, | |
extensionlessBranches= args.extensionlessBranches | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment