Skip to content

Instantly share code, notes, and snippets.

@tigerhawkvok
Last active January 15, 2025 18:06
Show Gist options
  • Save tigerhawkvok/b480daff38493cc6723a01fcec1cd0dd to your computer and use it in GitHub Desktop.
Save tigerhawkvok/b480daff38493cc6723a01fcec1cd0dd to your computer and use it in GitHub Desktop.
#!python3
# https://stackoverflow.com/a/79108334/1877527
# cSpell: words creatordate, extensionless, elease, andidate
from typing import Optional, List, Collection, Tuple, Final
import subprocess
import datetime as dt
import re
from pathlib import Path
COMMIT_TAG_REGEX = r"((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)" # cSpell: disable-line
DEFAULT_MERGE_COMMENT_PREFIX = r"merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*" + COMMIT_TAG_REGEX
CommandListType = List[List[str]]
__all__ = ["waterfallMerge"]
def waterfallMerge(fromBranch:str, message:Optional[str]= None, featureBranchLabel:Optional[str]= None, lastMergeID:Optional[str]= None, mergeCommentRegex:str= DEFAULT_MERGE_COMMENT_PREFIX, *, sign:bool= False, debug:bool = False, extensionlessBranches:Collection[str]= ("main", "master")):
r"""
Perform a waterfall merge from one branch the current one.
The commit points last joined are determined by looking at the following pool:
- the most recent tag on the current branch that matches the name of the branch to merge from. If the current branch is in the list of `extensionlessBranches`, then the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from. For example, if we are on branch `test` merging branch `dev` in, tag `v1.1.10-dev` would match tag `v1.1.10-test` on the current branch, and would be a candidate last-merge point. These tags should otherwise match semver rules.
- If the commit note message contains "merge commit: <commit>", then the referenced commit is a candidate last-merge point. The other commit is the one that contains the merge note. The string searched for is defined by the `mergeCommentRegex` parameter, which defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*` + `COMMIT_TAG_REGEX`, where `COMMIT_TAG_REGEX` is a regex that matches commit hashes and semver tags.
- If the pool of above is empty, the merge base of the current branch and the branch to merge from is used as the last-merge point.
The most recent reference is then used to create a new branch, merge the last-merge point and the branch to merge from into it, and then squash-merge that branch back into the current branch.
Based on https://stackoverflow.com/a/79108334/1877527
Parameters
----------
fromBranch: str
The branch to merge from.
message: Optional[str]
The commit message for the merge. If not provided, a default message is used.
featureBranchLabel: Optional[str]
The feature label for the branch. If provided, does not complete the final squash-merge.
lastMergeID: Optional[str]
The last merge commit ID. If not provided, the above pool is used to find the most recent reference.
mergeCommentRegex: str
Regex to find merge comments. Defaults to `merge (?:(?:in|from)(?:\s+[a-z_]+)?)?\s*(?:commit|tag):?\s*((?:[a-f0-9]{7,40}|v(?:\d+\.)+\d+)(?:(?:[\-_\.][a-z0-9\.]+)|(?:a(?:lpha)?|b(?:eta)?|r(?:elease)?c(?:andidate)?)\d+)?)`.
sign: bool, kwarg-only (default False)
Sign the commit.
debug: bool, kwarg-only (default False)
Enable debug output.
extensionlessBranches: Collection[str], kwarg-only (default ["main", "master"])
When searching tags on these branches, the tag name must match exactly. Otherwise, the tag name must end with the name of the branch to merge from.
"""
currentBranch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True).stdout.strip()
fromBranch = fromBranch.strip()
# Make sure we're not a no-op
if currentBranch == fromBranch:
raise ValueError("Error: The current branch and the branch to merge from are the same.")
# Make sure we're clean
if len(subprocess.run(["git", "status", "--porcelain"], check= True, shell= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE).stdout.decode("utf-8")) != 0:
_stashResult = subprocess.run(["git", "stash", "-u"], check= True, shell= True)
if len(subprocess.run(["git", "status", "--porcelain"], check= True, shell= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE).stdout.decode("utf-8")) != 0:
raise RuntimeError("Error: Working directory is not clean. Commit and stash all changes before proceeding; untracked files that conflict may be overwritten.")
else:
print("Stashed dirty state")
if debug:
print("Current branch:", currentBranch)
if isinstance(lastMergeID, str) and len(lastMergeID) > 0:
try:
commitHash = subprocess.run(["git", "rev-parse", lastMergeID], capture_output=True, text=True, check= True).stdout.strip()
except subprocess.CalledProcessError as e:
raise ValueError(f"Error: {lastMergeID} is not a valid commit hash.") from e
else:
print("Merging from commit:", commitHash)
else:
# Find the most recent tag name on the current branch
matched:List[Tuple[str, str]] = []
commitDate = None
commitHash = None
thisBranchLastMerge = None
tagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate", "--merged"], capture_output=True, text=True).stdout.strip().splitlines()
allTagNames = subprocess.run(["git", "tag", "-i", "--sort=-creatordate"], capture_output=True, text=True).stdout.strip().splitlines()
if len(tagNames) > 0:
if currentBranch not in extensionlessBranches:
extensionless = False
hereTagNames = [tag for tag in tagNames if tag.endswith(currentBranch)]
else:
extensionless = True
hereTagNames = tagNames
if debug:
print("Current branch tags:", hereTagNames)
thereTagNames = [tag for tag in allTagNames if tag.endswith(fromBranch)]
if debug:
print("From branch tags:", thereTagNames)
print("Extensionless?", extensionless)
__transformedTags = [tag.replace(f"-{fromBranch}", "" if extensionless else f"-{currentBranch}") for tag in thereTagNames]
# if debug:
# print("Matching against", __transformedTags)
matchedTags:List[Tuple[str, str]] = [(tag, __transformed) for __transformed, tag in zip(__transformedTags, thereTagNames) if __transformed in hereTagNames]
if debug:
print("Found synced tags:", matchedTags)
matched += [(subprocess.run(["git", "rev-parse", thereTag + r"^{}"], capture_output=True, text=True).stdout.strip(), subprocess.run(["git", "rev-parse", hereTag + r"^{}"], capture_output=True, text=True).stdout.strip()) for thereTag, hereTag in matchedTags]
if debug:
print("Matched commit hashes:", matched)
# Find ones with merge comments
if len(mergeCommentRegex) > 0:
reg = re.compile(mergeCommentRegex, re.IGNORECASE | re.MULTILINE)
mergeCommitMessages = subprocess.run(["git", "log", "--notes", "-i", r"--pretty=format:%H%n%s%b%N%n%n%n%n"], capture_output=True, text=True).stdout.strip().split("\n\n\n\n\n")
def _revHelper(rev:Optional[str]):
if not isinstance(rev, str):
return None
try:
return subprocess.run(["git", "rev-parse", rev], capture_output=True, text=True, check= True).stdout.strip()
except subprocess.CalledProcessError:
return None
reObjs = [reg.search(commitMessage) for commitMessage in mergeCommitMessages]
if debug:
print("Merge commit messages (last 5):", mergeCommitMessages[:5])
print("Raw matches", reObjs[:5])
# group 1 is the _last_ merge point, the commit containing it is the one we want now
__containingCommit = [(_reMatch.group(1), msg.strip().splitlines()[0]) for msg, _reMatch in zip(mergeCommitMessages, reObjs) if _reMatch is not None]
__mergePoint = [_reMatch.group(1) for _reMatch in reObjs if _reMatch is not None]
_messageMatches = [(_revHelper(_there), _revHelper(_here)) for _there, _here in __containingCommit]
if debug:
print("Matched messages", _messageMatches[:5])
nn = [(revThere, revHere) for revThere, revHere in _messageMatches if revThere is not None and revHere is not None]
validRevs = [rev for rev in nn if rev not in matched]
if len(validRevs) > 0:
if debug:
print("Found new valid revs with commit messages matching regex:", validRevs)
matched += validRevs
elif debug:
if len(nn) > 0:
print("All matched revs are already in the list of matched revs.")
else:
print("No valid revs found with commit messages matching regex.")
if len(matched) > 0:
tz = dt.timezone(-dt.timedelta(hours=8))
mostRecent = dt.datetime.min.replace(tzinfo= tz)
succeeded = False
for i, revs in enumerate(matched, 1):
rev, _thisBranchRev = revs
args = ["git", "log", rev, "-n", "1", "--date=iso8601-strict"]
try:
_lines = subprocess.run(args, capture_output= True, text= True).stdout.strip().splitlines()
try:
ds = next(__line for __line in _lines if __line.lower().startswith("date"))
except IndexError:
ds = _lines[2]
commitDateStr = ds.split(" ").pop()
try:
commitDate = dt.datetime.fromisoformat(commitDateStr)
except ValueError:
commitDate = dt.datetime.fromisoformat(commitDateStr.split("T").pop(0)).astimezone(tz)
except Exception as e:
if debug:
print(f"Error getting commit date for revision #{i} of {len(matched)} = {rev}: {e}")
print(" ".join(args))
if len(matched) == 1:
raise
else:
succeeded = True
if commitDate is not None and commitDate > mostRecent:
mostRecent = commitDate
commitHash = rev
thisBranchLastMerge = _thisBranchRev
if not succeeded:
raise ValueError("Error: No valid commit hashes found.")
if commitHash is None or len(matched) == 0:
# Assume the last merge was the most recent
commitHash = subprocess.run(["git", "merge-base", currentBranch, fromBranch], capture_output=True, text=True).stdout.strip()
# and the last time we merged onto this branch was the last merge point
thisBranchLastMerge = commitHash
print("No merge points found, using last merge base:", commitHash)
# raise ValueError("Error: No valid commit hashes found.")
print("Most recent reference:", commitHash)
# Now we have the most recent reference
if not isinstance(message, str) or len(message) == 0:
message = f"Merge `{fromBranch}` into `{currentBranch}`"
message += f"\n\nMerge from {fromBranch} commit: " + subprocess.run(["git", "log", "-n", "1", """--pretty=format:%H""", fromBranch], capture_output= True, text= True, check= True).stdout.strip()
if debug:
print("Using commit message:", message)
# Create a new branch
branchPrefix = "waterfallMerge"
if isinstance(featureBranchLabel, str) and len(featureBranchLabel) > 0:
branchPrefix = "feature-" + re.sub("[^a-z0-9]", "-", featureBranchLabel, 0, re.IGNORECASE | re.MULTILINE).strip("-")
__abort = True
else:
__abort = False
ABORT_BEFORE_SQUASH:Final[bool] = __abort
workingBranch = f"{branchPrefix}-{fromBranch}-into-{currentBranch}-{commitHash[:8]}"
# Build operations
if subprocess.run(["git", "rev-parse", "HEAD"], capture_output= True, text= True, check= True).stdout.strip() == thisBranchLastMerge:
if debug:
print("Already at the last merge point on the current branch, we don't need to reset to a historical commit.")
thisBranchLastMerge = None
if thisBranchLastMerge is not None and debug:
print("Resetting to last merge point on the current branch:", thisBranchLastMerge)
### Create the commands ###
## New branch merge operations ##
newBranchMergeOps:CommandListType = [["git", "checkout", thisBranchLastMerge]] if thisBranchLastMerge is not None else []
newBranchMergeOps += [
["git", "switch", "-f", "-c", workingBranch],
["git", "merge", commitHash, "-X", "theirs", "--no-edit"]
]
if thisBranchLastMerge is not None:
# Now that the previous merge point is caught up, we can catch up with commits on the current branch that happened
# after the last merge point
newBranchMergeOps.append(["git", "merge", currentBranch, "--no-edit"])
# Since, by definition, this is a _waterfall_ merge, we're assuming upstream conflicts are canonical
# so we use the `theirs` strategy to resolve conflicts
newBranchMergeOps.append(["git", "merge", fromBranch, "-X", "theirs", "--no-edit"])
## Current branch merge resolution operations ##
commitArgs = ["git", "commit"]
if sign:
commitArgs.append("-S")
commitArgs += ["-m", message]
currentBranchMergeBackOps:CommandListType = [
["git", "switch", "-f", currentBranch],
["git", "merge", "--squash", workingBranch],
commitArgs,
# Branch deletion may be added after the merge
# actually occurs, depending on the state of the working directory
]
remaining = [' '.join(x) for x in [*newBranchMergeOps, *currentBranchMergeBackOps]]
didWorkingDirCleanup:bool = False
if debug:
pretty = "\n".join(remaining)
raise RuntimeError(f"DEBUGGING: Stopping before creating temporary branch `{workingBranch}` and subsequent merge. \n\nPending operations: \n\n{pretty}")
# Switch to a new branch and merge together the last merge point
for _i, mergeOp in enumerate(newBranchMergeOps):
try:
subprocess.run(mergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE)
except subprocess.CalledProcessError as e:
pretty = "\n".join(remaining[1:])
errorMessage = str(e.stderr.decode('utf-8')).strip()
if len(errorMessage) == 0:
errorMessage = str(e.stdout.decode('utf-8')).strip()
if len(errorMessage) == 0:
errorMessage = "(No error message provided)"
messageFinalize = f"The most recent failed command was \n\n{remaining[0]}\n\n=> {errorMessage}\n\nAfter which remains: \n\n{pretty}"
if _i == 0:
cleanupMessage = f"-- you're left on the current branch. {messageFinalize}"
else:
cleanupMessage = f"-- you're left on the temporary branch {workingBranch} with the aborted merge state. You originated from {currentBranch}. {messageFinalize}"
if "overwritten by merge" in errorMessage:
# Remove the offending files
badFiles = [x.strip() for x in errorMessage.splitlines() if x.startswith(" ") or x.startswith("\t")]
if debug:
print(errorMessage.splitlines())
print("Found bad files:", badFiles)
if len(badFiles) > 0:
for _file in badFiles:
_filePath = Path(_file)
if not _filePath.is_file():
raise FileNotFoundError(f"ERROR: File {_file} is not a file, but is in the list of files to be overwritten {cleanupMessage}") from e
_filePath.unlink()
if debug:
print("Removed file", _filePath.resolve())
didWorkingDirCleanup = True
# Try our operation again
try:
subprocess.run(mergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE)
except subprocess.CalledProcessError as e2:
raise RuntimeError(f"ERROR: Could not fix up the merge after removing the offending files {cleanupMessage}") from e2
else:
# That fixed it
_ = remaining.pop(0)
continue
try:
subprocess.run(["git", "merge", "--abort"], check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE)
except subprocess.CalledProcessError:
# If we can't abort the merge we weren't really in one
pass
raise RuntimeError(f"Merging histories failed {cleanupMessage}") from e
else:
_ = remaining.pop(0) # remove the first element to keep track of remaining ops
### State validation ###
# Since we're waterfall-ing, we expect:
# - the working branch to be clean
# - our file state to match the `fromBranch` state
if thisBranchLastMerge is None and not didWorkingDirCleanup:
# We're clean, and we merged at the tip, so we can clean up the temporary branch
# without worries
_op = ["git", "branch", "-D", workingBranch]
currentBranchMergeBackOps.append(_op)
remaining.append(' '.join(_op))
subprocess.run(["git", "checkout", fromBranch, "*"], check= True, shell= True) # This should be a no-op if the merges worked
if len(subprocess.run(["git", "status", "--porcelain"], check= True, shell= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE).stdout.decode("utf-8")) != 0:
pretty = "\n".join(remaining)
raise RuntimeError(f"ERROR: Working directory is not clean after merging {workingBranch} into {currentBranch}. Please resolve any further changes needed on this branch, after which these commands remain to be executed: \n\n{pretty}")
if ABORT_BEFORE_SQUASH:
pretty = "\n".join(remaining)
print(f"Stopping before switching branches back and squashing temporary branch `{workingBranch}` into `{currentBranch}`. You're still on `{workingBranch}`.\n\nPending operations: \n\n{pretty}")
return
# Merge the branch back and clean up
for postMergeOp in currentBranchMergeBackOps:
try:
subprocess.run(postMergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE)
except subprocess.CalledProcessError as e:
pretty = "\n".join(remaining)
errorMessage = str(e.stderr.decode('utf-8'))
if "overwritten by merge" in errorMessage:
# Remove the offending files
badFiles = [x.strip() for x in errorMessage.splitlines() if x.startswith(" ") or x.startswith("\t")]
if debug:
print(errorMessage.splitlines())
print("Found bad files:", badFiles)
if len(badFiles) > 0:
for _file in badFiles:
_filePath = Path(_file)
if not _filePath.is_file():
raise FileNotFoundError(f"ERROR: File {_file} is not a file, but is in the list of files to be overwritten") from e
_filePath.unlink()
if debug:
print("Removed file", _filePath.resolve())
didWorkingDirCleanup = True
# Try our operation again
try:
subprocess.run(postMergeOp, check= True, stdout= subprocess.PIPE, stderr= subprocess.PIPE)
except subprocess.CalledProcessError as e2:
raise RuntimeError("ERROR: Could not fix up the merge after removing the offending files") from e2
else:
# That fixed it
_ = remaining.pop(0)
continue
raise RuntimeError(f"Fast-forward failed -- you're either on the temporary branch {workingBranch} or your destination branch {currentBranch}. The most recent failed command was \n\n{remaining[0]}\n\n=> {e.stderr.decode('utf-8')}\n\nAfter which remains: \n\n{pretty}") from e
else:
_ = remaining.pop(0)
if didWorkingDirCleanup or thisBranchLastMerge is not None:
reasons = []
if didWorkingDirCleanup:
reasons.append("cleaned up working directory")
if thisBranchLastMerge is not None:
reasons.append("did not start at the last merge point on the current branch.")
reason = " and ".join(reasons)
print(f"Not deleting temporary branch `{workingBranch}` because we {reason}")
elif debug:
print(f"Merged successfully and removed temporary merge branch {workingBranch}.")
if __name__ == "__main__":
# Read the two arguments provided on the command line
import argparse
parser = argparse.ArgumentParser(description= "Perform a waterfall merge from one branch to another.")
parser.add_argument("fromBranch", type= str, help= "The branch to merge from.")
parser.add_argument("-b", "--branch-label", type= str, default= None, help="The feature label for the branch. If provided, does not complete the final squash-merge.")
parser.add_argument("lastMergeID", type= str, nargs= '?', default =None, help= "The last merge commit ID (optional).")
parser.add_argument("-m", "--message", type= str, default= None, help="The commit message for the merge.")
parser.add_argument("--mergeCommentRegex", type= str, default= DEFAULT_MERGE_COMMENT_PREFIX, help= "Regex to find merge comments.")
parser.add_argument("-s", "--sign", action= "store_true", help= "Sign the commit.")
parser.add_argument("--debug", action= "store_true", help= "Enable debug output.")
parser.add_argument("--extensionlessBranches", type= str, nargs= '*', default= ["main", "master"], help= "Branches that are considered extensionless.")
args = parser.parse_args()
waterfallMerge(
fromBranch= args.fromBranch,
message= args.message,
lastMergeID= args.lastMergeID,
mergeCommentRegex= args.mergeCommentRegex,
sign= args.sign,
debug= args.debug,
extensionlessBranches= args.extensionlessBranches
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment