Skip to content

Instantly share code, notes, and snippets.

@tigerhawkvok
Last active November 5, 2024 17:54
Show Gist options
  • Save tigerhawkvok/b480daff38493cc6723a01fcec1cd0dd to your computer and use it in GitHub Desktop.
Save tigerhawkvok/b480daff38493cc6723a01fcec1cd0dd to your computer and use it in GitHub Desktop.
#!python3
# https://stackoverflow.com/a/79108334/1877527
# cSpell: words creatordate, extensionless
from typing import Optional, List, Collection
import subprocess
import datetime as dt
import re
COMMIT_TAG_REGEX = r"((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)" # cSpell: disable-line
DEFAULT_MERGE_COMMENT_PREFIX = r"merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*" + COMMIT_TAG_REGEX
__all__ = ["waterfallMerge"]
def waterfallMerge(fromBranch:str, message:Optional[str]= None, featureBranchLabel:Optional[str]= None, lastMergeID:Optional[str]= None, mergeCommentRegex:str= DEFAULT_MERGE_COMMENT_PREFIX, *, sign:bool= False, debug:bool = False, extensionlessBranches:Collection[str]= ("main", "master")):
"""
Perform a waterfall merge from one branch the current one.
The commit points last joined are determined by looking at the following pool:
- the most recent tag on the current branch that matches the name of the branch to merge from. If the current branch is in the list of `extensionlessBranches`, then the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from. For example, if we are on branch `tests` merging branch `dev` in, tag `v1.1.10-dev` would match tag `v1.1.10-test` on the current branch, and would be a candidate last-merge point. These tags should otherwise match semver rules.
- If the commit note message contains "merge commit: <commit>", then the referenced commit is a candidate last-merge point. The commit is the one that contains the merge note. The string searched for is defined by the `mergeCommentRegex` parameter, which defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*` + `COMMIT_TAG_REGEX`, where `COMMIT_TAG_REGEX` is a regex that matches commit hashes and semver tags.
- If the pool of above is empty, the merge base of the current branch and the branch to merge from is used as the last-merge point.
The most recent reference is then used to create a new branch, merge the last-merge point and the branch to merge from into it, and then squash-merge that branch back into the current branch.
Based on https://stackoverflow.com/a/79108334/1877527
Parameters
----------
fromBranch: str
The branch to merge from.
message: Optional[str]
The commit message for the merge. If not provided, a default message is used.
featureBranchLabel: Optional[str]
The feature label for the branch. If provided, does not complete the final squash-merge.
lastMergeID: Optional[str]
The last merge commit ID. If not provided, the above pool is used to find the most recent reference.
mergeCommentRegex: str
Regex to find merge comments. Defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)`.
sign: bool, kwarg-only (default False)
Sign the commit.
debug: bool, kwarg-only (default False)
Enable debug output.
extensionlessBranches: Collection[str], kwarg-only (default ["main", "master"])
When searching tags on these branches, the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from.
"""
currentBranch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True).stdout.strip()
fromBranch = fromBranch.strip()
if currentBranch == fromBranch:
raise ValueError("Error: The current branch and the branch to merge from are the same.")
if debug:
print("Current branch:", currentBranch)
if isinstance(lastMergeID, str) and len(lastMergeID) > 0:
try:
commitHash = subprocess.run(["git", "rev-parse", lastMergeID], capture_output=True, text=True, check= True).stdout.strip()
except subprocess.CalledProcessError as e:
raise ValueError(f"Error: {lastMergeID} is not a valid commit hash.") from e
else:
print("Merging from commit:", commitHash)
else:
# Find the most recent tag name on the current branch
matched:List[str] = []
tagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate", "--merged"], capture_output=True, text=True).stdout.strip().splitlines()
allTagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate"], capture_output=True, text=True).stdout.strip().splitlines()
if len(tagNames) > 0:
if currentBranch not in extensionlessBranches:
extensionless = False
hereTagNames = [tag for tag in tagNames if tag.endswith(currentBranch)]
else:
extensionless = True
hereTagNames = tagNames
if debug:
print("Current branch tags:", hereTagNames)
thereTagNames = [tag for tag in allTagNames if tag.endswith(fromBranch)]
if debug:
print("From branch tags:", thereTagNames)
print("Extensionless?", extensionless)
__transformedTags = [tag.replace(f"-{fromBranch}", "" if extensionless else f"-{currentBranch}") for tag in thereTagNames]
# if debug:
# print("Matching against", __transformedTags)
matchedTags = [tag for __transformed, tag in zip(__transformedTags, thereTagNames) if __transformed in hereTagNames]
if debug:
print("Found synced tags:", matchedTags)
matched += [subprocess.run(["git", "rev-parse", tag + r"^{}"], capture_output=True, text=True).stdout.strip() for tag in matchedTags]
if debug:
print("Matched commit hashes:", matched)
# Find ones with merge comments
if len(mergeCommentRegex) > 0:
reg = re.compile(mergeCommentRegex, re.IGNORECASE | re.MULTILINE)
mergeCommitMessages = subprocess.run(["git", "log", "--notes", "-i", r"--pretty=format:%H%n%s%b%N%n%n%n%n"], capture_output=True, text=True).stdout.strip().split("\n\n\n\n\n")
def _revHelper(rev:Optional[str]):
if not isinstance(rev, str):
return None
try:
return subprocess.run(["git", "rev-parse", rev], capture_output=True, text=True, check= True).stdout.strip()
except subprocess.CalledProcessError:
return None
reObjs = [reg.search(commitMessage) for commitMessage in mergeCommitMessages]
if debug:
print("Merge commit messages (last 5):", mergeCommitMessages[:5])
print("Raw matches", reObjs[:5])
# group 1 is the _last_ merge point, the commit containing it is the one we want now
__containingCommit = [msg.strip().splitlines()[0] for msg, _reMatch in zip(mergeCommitMessages, reObjs) if _reMatch is not None]
__mergePoint = [_reMatch.group(1) for _reMatch in reObjs if _reMatch is not None]
_messageMatches = [_revHelper(_c) for _c in __mergePoint]
if debug:
print("Matched messages", _messageMatches[:5])
nn = [rev for rev in _messageMatches if rev is not None]
validRevs = [rev for rev in nn if rev not in matched]
if len(validRevs) > 0:
if debug:
print("Found new valid revs with commit messages matching regex:", validRevs)
matched += validRevs
elif debug:
if len(nn) > 0:
print("All matched revs are already in the list of matched revs.")
else:
print("No valid revs found with commit messages matching regex.")
if len(matched) == 0:
# Assume the last merge was the most recent
commitHash = subprocess.run(["git", "merge-base", currentBranch, fromBranch], capture_output=True, text=True).stdout.strip()
print("No merge points found, using last merge base:", commitHash)
raise ValueError("Error: No valid commit hashes found.")
else:
tz = dt.timezone(-dt.timedelta(hours=8))
mostRecent = dt.datetime.min.replace(tzinfo= tz)
succeeded = False
for i, rev in enumerate(matched, 1):
args = ["git", "log", rev, "-n", "1", "--date=iso8601-strict"]
try:
_lines = subprocess.run(args, capture_output= True, text= True).stdout.strip().splitlines()
try:
ds = [__line for __line in _lines if __line.lower().startswith("date")][0]
except IndexError:
ds = _lines[2]
commitDateStr = ds.split(" ").pop()
commitDate = dt.datetime.fromisoformat(commitDateStr)
except Exception as e:
if debug:
print(f"Error getting commit date for revision #{i} of {len(matched)} = {rev}: {e}")
print(" ".join(args))
if len(matched) == 1:
raise
else:
succeeded = True
if commitDate > mostRecent:
mostRecent = commitDate
commitHash = rev
if not succeeded:
raise ValueError("Error: No valid commit hashes found.")
print("Most recent reference:", commitHash)
# Now we have the most recent reference
if not isinstance(message, str) or len(message) == 0:
message = f"Merge `{fromBranch}` into `{currentBranch}`"
message += f"\n\nMerge from {fromBranch} commit: " + subprocess.run(["git", "log", "-n", "1", """--pretty=format:%H""", fromBranch], capture_output= True, text= True, check= True).stdout.strip()
if debug:
print("Using commit message:", message)
# Create a new branch
branchPrefix = "waterfallMerge"
if isinstance(featureBranchLabel, str) and len(featureBranchLabel) > 0:
branchPrefix = "feature-" + re.sub("[^a-z0-9]", "-", featureBranchLabel, 0, re.IGNORECASE | re.MULTILINE).strip("-")
ABORT_BEFORE_SQUASH = True
else:
ABORT_BEFORE_SQUASH = False
workingBranch = f"{branchPrefix}-{fromBranch}-into-{currentBranch}-{commitHash[:8]}"
if debug:
raise RuntimeError(f"Stopping before creating temporary branch `{workingBranch}` and subsequent merge.")
subprocess.run(["git", "switch", "-c", workingBranch], check= True, stdout= subprocess.PIPE)
# Merge in the last merge point
try:
subprocess.run(["git", "merge", commitHash, "--no-edit"], check= True, stdout= subprocess.PIPE)
subprocess.run(["git", "merge", fromBranch, "--no-edit"], check= True, stdout= subprocess.PIPE)
except Exception as e:
subprocess.run(["git", "merge", "--abort"], check= True, stdout= subprocess.PIPE)
raise RuntimeError(f"Merge failed -- you're left on the temporary branch {workingBranch} with the aborted merge state. You originated from {currentBranch}.") from e
if ABORT_BEFORE_SQUASH:
print(f"Stopping before switching branches back and squashing temporary branch `{workingBranch}` into `{currentBranch}`. You're still on `{workingBranch}`.")
return
subprocess.run(["git", "switch", currentBranch], check= True, stdout= subprocess.PIPE)
subprocess.run(["git", "merge", "--squash", workingBranch], check= True, stdout= subprocess.PIPE)
commitArgs = ["git", "commit"]
if sign:
commitArgs.append("-S")
commitArgs += ["-m", message]
subprocess.run(commitArgs, check= True, stdout= subprocess.PIPE)
# Clean up
subprocess.run(["git", "branch", "-D", workingBranch], check= True, stdout= subprocess.PIPE)
if debug:
print(f"Merged successfully and removed temporary merge branch {workingBranch}.")
if __name__ == "__main__":
# Read the two arguments provided on the command line
import argparse
parser = argparse.ArgumentParser(description= "Perform a waterfall merge from one branch to another.")
parser.add_argument("fromBranch", type= str, help= "The branch to merge from.")
parser.add_argument("-b", "--branch-label", type= str, default= None, help="The feature label for the branch. If provided, does not complete the final squash-merge.")
parser.add_argument("lastMergeID", type= str, nargs= '?', default =None, help= "The last merge commit ID (optional).")
parser.add_argument("-m", "--message", type= str, default= None, help="The commit message for the merge.")
parser.add_argument("--mergeCommentRegex", type= str, default= DEFAULT_MERGE_COMMENT_PREFIX, help= "Regex to find merge comments.")
parser.add_argument("-s", "--sign", action= "store_true", help= "Sign the commit.")
parser.add_argument("--debug", action= "store_true", help= "Enable debug output.")
parser.add_argument("--extensionlessBranches", type= str, nargs= '*', default= ["main", "master"], help= "Branches that are considered extensionless.")
args = parser.parse_args()
waterfallMerge(
fromBranch= args.fromBranch,
message= args.message,
lastMergeID= args.lastMergeID,
mergeCommentRegex= args.mergeCommentRegex,
sign= args.sign,
debug= args.debug,
extensionlessBranches= args.extensionlessBranches
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment