Last active
January 15, 2025 18:06
-
-
Save tigerhawkvok/b480daff38493cc6723a01fcec1cd0dd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!python3 | |
# https://stackoverflow.com/a/79108334/1877527 | |
# cSpell: words creatordate, extensionless, elease, andidate | |
from typing import Optional, List, Collection, Tuple, Final | |
import subprocess | |
import datetime as dt | |
import re | |
from pathlib import Path | |
COMMIT_TAG_REGEX = r"((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)" # cSpell: disable-line | |
DEFAULT_MERGE_COMMENT_PREFIX = r"merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*" + COMMIT_TAG_REGEX | |
CommandListType = List[List[str]] | |
__all__ = ["waterfallMerge"] | |
def waterfallMerge(fromBranch:str, message:Optional[str]= None, featureBranchLabel:Optional[str]= None, lastMergeID:Optional[str]= None, mergeCommentRegex:str= DEFAULT_MERGE_COMMENT_PREFIX, *, sign:bool= False, debug:bool = False, extensionlessBranches:Collection[str]= ("main", "master")): | |
r""" | |
Perform a waterfall merge from one branch the current one. | |
The commit points last joined are determined by looking at the following pool: | |
- the most recent tag on the current branch that matches the name of the branch to merge from. If the current branch is in the list of `extensionlessBranches`, then the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from. For example, if we are on branch `test` merging branch `dev` in, tag `v1.1.10-dev` would match tag `v1.1.10-test` on the current branch, and would be a candidate last-merge point. These tags should otherwise match semver rules. | |
- If the commit note message contains "merge commit: <commit>", then the referenced commit is a candidate last-merge point. The other commit is the one that contains the merge note. The string searched for is defined by the `mergeCommentRegex` parameter, which defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*` + `COMMIT_TAG_REGEX`, where `COMMIT_TAG_REGEX` is a regex that matches commit hashes and semver tags. | |
- If the pool of above is empty, the merge base of the current branch and the branch to merge from is used as the last-merge point. | |
The most recent reference is then used to create a new branch, merge the last-merge point and the branch to merge from into it, and then squash-merge that branch back into the current branch. | |
Based on https://stackoverflow.com/a/79108334/1877527 | |
Parameters | |
---------- | |
fromBranch: str | |
The branch to merge from. | |
message: Optional[str] | |
The commit message for the merge. If not provided, a default message is used. | |
featureBranchLabel: Optional[str] | |
The feature label for the branch. If provided, does not complete the final squash-merge. | |
lastMergeID: Optional[str] | |
The last merge commit ID. If not provided, the above pool is used to find the most recent reference. | |
mergeCommentRegex: str | |
Regex to find merge comments. Defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)`. | |
sign: bool, kwarg-only (default False) | |
Sign the commit. | |
debug: bool, kwarg-only (default False) | |
Enable debug output. | |
extensionlessBranches: Collection[str], kwarg-only (default ["main", "master"]) | |
When searching tags on these branches, the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from. | |
""" | |
currentBranch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True).stdout.strip() | |
fromBranch = fromBranch.strip() | |
# Make sure we're not a no-op | |
if currentBranch == fromBranch: | |
raise ValueError("Error: The current branch and the branch to merge from are the same.") | |
# Make sure we're clean | |
if len(subprocess.run(["git", "status", "--porcelain"], check= True, shell= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE).stdout.decode("utf-8")) != 0: | |
_stashResult = subprocess.run(["git", "stash", "-u"], check= True, shell= True) | |
if len(subprocess.run(["git", "status", "--porcelain"], check= True, shell= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE).stdout.decode("utf-8")) != 0: | |
raise RuntimeError("Error: Working directory is not clean. Commit and stash all changes before proceeding; untracked files that conflict may be overwritten.") | |
else: | |
print("Stashed dirty state") | |
if debug: | |
print("Current branch:", currentBranch) | |
if isinstance(lastMergeID, str) and len(lastMergeID) > 0: | |
try: | |
commitHash = subprocess.run(["git", "rev-parse", lastMergeID], capture_output=True, text=True, check= True).stdout.strip() | |
except subprocess.CalledProcessError as e: | |
raise ValueError(f"Error: {lastMergeID} is not a valid commit hash.") from e | |
else: | |
print("Merging from commit:", commitHash) | |
else: | |
# Find the most recent tag name on the current branch | |
matched:List[Tuple[str, str]] = [] | |
commitDate = None | |
commitHash = None | |
thisBranchLastMerge = None | |
tagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate", "--merged"], capture_output=True, text=True).stdout.strip().splitlines() | |
allTagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate"], capture_output=True, text=True).stdout.strip().splitlines() | |
if len(tagNames) > 0: | |
if currentBranch not in extensionlessBranches: | |
extensionless = False | |
hereTagNames = [tag for tag in tagNames if tag.endswith(currentBranch)] | |
else: | |
extensionless = True | |
hereTagNames = tagNames | |
if debug: | |
print("Current branch tags:", hereTagNames) | |
thereTagNames = [tag for tag in allTagNames if tag.endswith(fromBranch)] | |
if debug: | |
print("From branch tags:", thereTagNames) | |
print("Extensionless?", extensionless) | |
__transformedTags = [tag.replace(f"-{fromBranch}", "" if extensionless else f"-{currentBranch}") for tag in thereTagNames] | |
# if debug: | |
# print("Matching against", __transformedTags) | |
matchedTags:List[Tuple[str, str]] = [(tag, __transformed) for __transformed, tag in zip(__transformedTags, thereTagNames) if __transformed in hereTagNames] | |
if debug: | |
print("Found synced tags:", matchedTags) | |
matched += [(subprocess.run(["git", "rev-parse", thereTag + r"^{}"], capture_output=True, text=True).stdout.strip(), subprocess.run(["git", "rev-parse", hereTag + r"^{}"], capture_output=True, text=True).stdout.strip()) for thereTag, hereTag in matchedTags] | |
if debug: | |
print("Matched commit hashes:", matched) | |
# Find ones with merge comments | |
if len(mergeCommentRegex) > 0: | |
reg = re.compile(mergeCommentRegex, re.IGNORECASE | re.MULTILINE) | |
mergeCommitMessages = subprocess.run(["git", "log", "--notes", "-i", r"--pretty=format:%H%n%s%b%N%n%n%n%n"], capture_output=True, text=True).stdout.strip().split("\n\n\n\n\n") | |
def _revHelper(rev:Optional[str]): | |
if not isinstance(rev, str): | |
return None | |
try: | |
return subprocess.run(["git", "rev-parse", rev], capture_output=True, text=True, check= True).stdout.strip() | |
except subprocess.CalledProcessError: | |
return None | |
reObjs = [reg.search(commitMessage) for commitMessage in mergeCommitMessages] | |
if debug: | |
print("Merge commit messages (last 5):", mergeCommitMessages[:5]) | |
print("Raw matches", reObjs[:5]) | |
# group 1 is the _last_ merge point, the commit containing it is the one we want now | |
__containingCommit = [(_reMatch.group(1), msg.strip().splitlines()[0]) for msg, _reMatch in zip(mergeCommitMessages, reObjs) if _reMatch is not None] | |
__mergePoint = [_reMatch.group(1) for _reMatch in reObjs if _reMatch is not None] | |
_messageMatches = [(_revHelper(_there), _revHelper(_here)) for _there, _here in __containingCommit] | |
if debug: | |
print("Matched messages", _messageMatches[:5]) | |
nn = [(revThere, revHere) for revThere, revHere in _messageMatches if revThere is not None and revHere is not None] | |
validRevs = [rev for rev in nn if rev not in matched] | |
if len(validRevs) > 0: | |
if debug: | |
print("Found new valid revs with commit messages matching regex:", validRevs) | |
matched += validRevs | |
elif debug: | |
if len(nn) > 0: | |
print("All matched revs are already in the list of matched revs.") | |
else: | |
print("No valid revs found with commit messages matching regex.") | |
if len(matched) > 0: | |
tz = dt.timezone(-dt.timedelta(hours=8)) | |
mostRecent = dt.datetime.min.replace(tzinfo= tz) | |
succeeded = False | |
for i, revs in enumerate(matched, 1): | |
rev, _thisBranchRev = revs | |
args = ["git", "log", rev, "-n", "1", "--date=iso8601-strict"] | |
try: | |
_lines = subprocess.run(args, capture_output= True, text= True).stdout.strip().splitlines() | |
try: | |
ds = next(__line for __line in _lines if __line.lower().startswith("date")) | |
except IndexError: | |
ds = _lines[2] | |
commitDateStr = ds.split(" ").pop() | |
try: | |
commitDate = dt.datetime.fromisoformat(commitDateStr) | |
except ValueError: | |
commitDate = dt.datetime.fromisoformat(commitDateStr.split("T").pop(0)).astimezone(tz) | |
except Exception as e: | |
if debug: | |
print(f"Error getting commit date for revision #{i} of {len(matched)} = {rev}: {e}") | |
print(" ".join(args)) | |
if len(matched) == 1: | |
raise | |
else: | |
succeeded = True | |
if commitDate is not None and commitDate > mostRecent: | |
mostRecent = commitDate | |
commitHash = rev | |
thisBranchLastMerge = _thisBranchRev | |
if not succeeded: | |
raise ValueError("Error: No valid commit hashes found.") | |
if commitHash is None or len(matched) == 0: | |
# Assume the last merge was the most recent | |
commitHash = subprocess.run(["git", "merge-base", currentBranch, fromBranch], capture_output=True, text=True).stdout.strip() | |
# and the last time we merged onto this branch was the last merge point | |
thisBranchLastMerge = commitHash | |
print("No merge points found, using last merge base:", commitHash) | |
# raise ValueError("Error: No valid commit hashes found.") | |
print("Most recent reference:", commitHash) | |
# Now we have the most recent reference | |
if not isinstance(message, str) or len(message) == 0: | |
message = f"Merge `{fromBranch}` into `{currentBranch}`" | |
message += f"\n\nMerge from {fromBranch} commit: " + subprocess.run(["git", "log", "-n", "1", """--pretty=format:%H""", fromBranch], capture_output= True, text= True, check= True).stdout.strip() | |
if debug: | |
print("Using commit message:", message) | |
# Create a new branch | |
branchPrefix = "waterfallMerge" | |
if isinstance(featureBranchLabel, str) and len(featureBranchLabel) > 0: | |
branchPrefix = "feature-" + re.sub("[^a-z0-9]", "-", featureBranchLabel, 0, re.IGNORECASE | re.MULTILINE).strip("-") | |
__abort = True | |
else: | |
__abort = False | |
ABORT_BEFORE_SQUASH:Final[bool] = __abort | |
workingBranch = f"{branchPrefix}-{fromBranch}-into-{currentBranch}-{commitHash[:8]}" | |
# Build operations | |
if subprocess.run(["git", "rev-parse", "HEAD"], capture_output= True, text= True, check= True).stdout.strip() == thisBranchLastMerge: | |
if debug: | |
print("Already at the last merge point on the current branch, we don't need to reset to a historical commit.") | |
thisBranchLastMerge = None | |
if thisBranchLastMerge is not None and debug: | |
print("Resetting to last merge point on the current branch:", thisBranchLastMerge) | |
### Create the commands ### | |
## New branch merge operations ## | |
newBranchMergeOps:CommandListType = [["git", "checkout", thisBranchLastMerge]] if thisBranchLastMerge is not None else [] | |
newBranchMergeOps += [ | |
["git", "switch", "-f", "-c", workingBranch], | |
["git", "merge", commitHash, "-X", "theirs", "--no-edit"] | |
] | |
if thisBranchLastMerge is not None: | |
# Now that the previous merge point is caught up, we can catch up with commits on the current branch that happened | |
# after the last merge point | |
newBranchMergeOps.append(["git", "merge", currentBranch, "--no-edit"]) | |
# Since, by definition, this is a _waterfall_ merge, we're assuming upstream conflicts are canonical | |
# so we use the `theirs` strategy to resolve conflicts | |
newBranchMergeOps.append(["git", "merge", fromBranch, "-X", "theirs", "--no-edit"]) | |
## Current branch merge resolution operations ## | |
commitArgs = ["git", "commit"] | |
if sign: | |
commitArgs.append("-S") | |
commitArgs += ["-m", message] | |
currentBranchMergeBackOps:CommandListType = [ | |
["git", "switch", "-f", currentBranch], | |
["git", "merge", "--squash", workingBranch], | |
commitArgs, | |
# Branch deletion may be added after the merge | |
# actually occurs, depending on the state of the working directory | |
] | |
remaining = [' '.join(x) for x in [*newBranchMergeOps, *currentBranchMergeBackOps]] | |
didWorkingDirCleanup:bool = False | |
if debug: | |
pretty = "\n".join(remaining) | |
raise RuntimeError(f"DEBUGGING: Stopping before creating temporary branch `{workingBranch}` and subsequent merge. \n\nPending operations: \n\n{pretty}") | |
# Switch to a new branch and merge together the last merge point | |
for _i, mergeOp in enumerate(newBranchMergeOps): | |
try: | |
subprocess.run(mergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE) | |
except subprocess.CalledProcessError as e: | |
pretty = "\n".join(remaining[1:]) | |
errorMessage = str(e.stderr.decode('utf-8')).strip() | |
if len(errorMessage) == 0: | |
errorMessage = str(e.stdout.decode('utf-8')).strip() | |
if len(errorMessage) == 0: | |
errorMessage = "(No error message provided)" | |
messageFinalize = f"The most recent failed command was \n\n{remaining[0]}\n\n=> {errorMessage}\n\nAfter which remains: \n\n{pretty}" | |
if _i == 0: | |
cleanupMessage = f"-- you're left on the current branch. {messageFinalize}" | |
else: | |
cleanupMessage = f"-- you're left on the temporary branch {workingBranch} with the aborted merge state. You originated from {currentBranch}. {messageFinalize}" | |
if "overwritten by merge" in errorMessage: | |
# Remove the offending files | |
badFiles = [x.strip() for x in errorMessage.splitlines() if x.startswith(" ") or x.startswith("\t")] | |
if debug: | |
print(errorMessage.splitlines()) | |
print("Found bad files:", badFiles) | |
if len(badFiles) > 0: | |
for _file in badFiles: | |
_filePath = Path(_file) | |
if not _filePath.is_file(): | |
raise FileNotFoundError(f"ERROR: File {_file} is not a file, but is in the list of files to be overwritten {cleanupMessage}") from e | |
_filePath.unlink() | |
if debug: | |
print("Removed file", _filePath.resolve()) | |
didWorkingDirCleanup = True | |
# Try our operation again | |
try: | |
subprocess.run(mergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE) | |
except subprocess.CalledProcessError as e2: | |
raise RuntimeError(f"ERROR: Could not fix up the merge after removing the offending files {cleanupMessage}") from e2 | |
else: | |
# That fixed it | |
_ = remaining.pop(0) | |
continue | |
try: | |
subprocess.run(["git", "merge", "--abort"], check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE) | |
except subprocess.CalledProcessError: | |
# If we can't abort the merge we weren't really in one | |
pass | |
raise RuntimeError(f"Merging histories failed {cleanupMessage}") from e | |
else: | |
_ = remaining.pop(0) # remove the first element to keep track of remaining ops | |
### State validation ### | |
# Since we're waterfall-ing, we expect: | |
# - the working branch to be clean | |
# - our file state to match the `fromBranch` state | |
if thisBranchLastMerge is None and not didWorkingDirCleanup: | |
# We're clean, and we merged at the tip, so we can clean up the temporary branch | |
# without worries | |
_op = ["git", "branch", "-D", workingBranch] | |
currentBranchMergeBackOps.append(_op) | |
remaining.append(' '.join(_op)) | |
subprocess.run(["git", "checkout", fromBranch, "*"], check= True, shell= True) # This should be a no-op if the merges worked | |
if len(subprocess.run(["git", "status", "--porcelain"], check= True, shell= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE).stdout.decode("utf-8")) != 0: | |
pretty = "\n".join(remaining) | |
raise RuntimeError(f"ERROR: Working directory is not clean after merging {workingBranch} into {currentBranch}. Please resolve any further changes needed on this branch, after which these commands remain to be executed: \n\n{pretty}") | |
if ABORT_BEFORE_SQUASH: | |
pretty = "\n".join(remaining) | |
print(f"Stopping before switching branches back and squashing temporary branch `{workingBranch}` into `{currentBranch}`. You're still on `{workingBranch}`.\n\nPending operations: \n\n{pretty}") | |
return | |
# Merge the branch back and clean up | |
for postMergeOp in currentBranchMergeBackOps: | |
try: | |
subprocess.run(postMergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE) | |
except subprocess.CalledProcessError as e: | |
pretty = "\n".join(remaining) | |
errorMessage = str(e.stderr.decode('utf-8')) | |
if "overwritten by merge" in errorMessage: | |
# Remove the offending files | |
badFiles = [x.strip() for x in errorMessage.splitlines() if x.startswith(" ") or x.startswith("\t")] | |
if debug: | |
print(errorMessage.splitlines()) | |
print("Found bad files:", badFiles) | |
if len(badFiles) > 0: | |
for _file in badFiles: | |
_filePath = Path(_file) | |
if not _filePath.is_file(): | |
raise FileNotFoundError(f"ERROR: File {_file} is not a file, but is in the list of files to be overwritten") from e | |
_filePath.unlink() | |
if debug: | |
print("Removed file", _filePath.resolve()) | |
didWorkingDirCleanup = True | |
# Try our operation again | |
try: | |
subprocess.run(postMergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE) | |
except subprocess.CalledProcessError as e2: | |
raise RuntimeError("ERROR: Could not fix up the merge after removing the offending files") from e2 | |
else: | |
# That fixed it | |
_ = remaining.pop(0) | |
continue | |
raise RuntimeError(f"Fast-forward failed -- you're either on the temporary branch {workingBranch} or your destination branch {currentBranch}. The most recent failed command was \n\n{remaining[0]}\n\n=> {e.stderr.decode('utf-8')}\n\nAfter which remains: \n\n{pretty}") from e | |
else: | |
_ = remaining.pop(0) | |
if didWorkingDirCleanup or thisBranchLastMerge is not None: | |
reasons = [] | |
if didWorkingDirCleanup: | |
reasons.append("cleaned up working directory") | |
if thisBranchLastMerge is not None: | |
reasons.append("did not start at the last merge point on the current branch.") | |
reason = " and ".join(reasons) | |
print(f"Not deleting temporary branch `{workingBranch}` because we {reason}") | |
elif debug: | |
print(f"Merged successfully and removed temporary merge branch {workingBranch}.") | |
if __name__ == "__main__": | |
# Read the two arguments provided on the command line | |
import argparse | |
parser = argparse.ArgumentParser(description= "Perform a waterfall merge from one branch to another.") | |
parser.add_argument("fromBranch", type= str, help= "The branch to merge from.") | |
parser.add_argument("-b", "--branch-label", type= str, default= None, help="The feature label for the branch. If provided, does not complete the final squash-merge.") | |
parser.add_argument("lastMergeID", type= str, nargs= '?', default =None, help= "The last merge commit ID (optional).") | |
parser.add_argument("-m", "--message", type= str, default= None, help="The commit message for the merge.") | |
parser.add_argument("--mergeCommentRegex", type= str, default= DEFAULT_MERGE_COMMENT_PREFIX, help= "Regex to find merge comments.") | |
parser.add_argument("-s", "--sign", action= "store_true", help= "Sign the commit.") | |
parser.add_argument("--debug", action= "store_true", help= "Enable debug output.") | |
parser.add_argument("--extensionlessBranches", type= str, nargs= '*', default= ["main", "master"], help= "Branches that are considered extensionless.") | |
args = parser.parse_args() | |
waterfallMerge( | |
fromBranch= args.fromBranch, | |
message= args.message, | |
lastMergeID= args.lastMergeID, | |
mergeCommentRegex= args.mergeCommentRegex, | |
sign= args.sign, | |
debug= args.debug, | |
extensionlessBranches= args.extensionlessBranches | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment