-
-
Save moretea/5728cf02e461fd20825b22fa6d41e8fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import click | |
import subprocess | |
import pathlib | |
from dataclasses import dataclass | |
import functools | |
import typing | |
import tabulate | |
import re | |
import json | |
import os | |
class AliasedGroup(click.Group): | |
def get_command(self, ctx, cmd_name): | |
rv = click.Group.get_command(self, ctx, cmd_name) | |
if rv is not None: | |
return rv | |
if '-' in cmd_name: | |
parts = cmd_name.split("-") | |
matches = [] | |
for x in self.list_commands(ctx): | |
x_parts = [y[0] for y in x.split("-")] | |
if x_parts == parts: | |
matches.append(x) | |
else: | |
matches = [x for x in self.list_commands(ctx) | |
if x.startswith(cmd_name)] | |
if not matches: | |
return None | |
elif len(matches) == 1: | |
return click.Group.get_command(self, ctx, matches[0]) | |
ctx.fail('Too many matches: %s' % ', '.join(sorted(matches))) | |
@click.command(cls=AliasedGroup) | |
@click.pass_context | |
def main(ctx): | |
"""Git worktree tools""" | |
ctx.obj = GitRepo.in_current_dir() | |
@main.command() | |
@click.argument("QUERY") | |
@click.pass_context | |
def dir(ctx, query): | |
""" Print directory of worktree""" | |
found = [] | |
for wt in ctx.obj.worktrees: | |
if query.lower() in str(wt.rel_path).lower(): | |
found.append(wt) | |
if len(found) == 0: | |
ctx.fail("No match found") | |
if len(found) > 1: | |
matchtxt = [wt.rel_path for wt in found] | |
ctx.fail(f"Too many matches found: {matchtxt}") | |
wt = found[0] | |
print(ctx.obj.root / wt.rel_path) | |
@main.command() | |
@click.pass_context | |
def ls(ctx): | |
""" List worktrees """ | |
header = ["branch", "path", "sha"] | |
rows = [] | |
for wt in ctx.obj.worktrees: | |
rows.append((wt.attributes.get("branch","-"), wt.rel_path, wt.head)) | |
print(tabulate.tabulate(rows, headers=header)) | |
@main.command() | |
@click.argument("BRANCH_NAME") | |
@click.option("--branch-from", default="origin/main") | |
@click.pass_context | |
def create_branch(ctx,branch_name, branch_from): | |
""" Create a worktree based on branch name""" | |
ctx.obj.create_worktree(branch_name,branch_from) | |
@main.command() | |
@click.argument("JIRA_TICKET") | |
@click.option("--kind", default="feat") | |
@click.option("--branch-from", default="origin/main") | |
@click.pass_context | |
def create_ticket(ctx, jira_ticket, kind, branch_from): | |
""" Create a worktree based on a JIRA ticket""" | |
ticket_info = json.loads(subprocess.run(args=["jira", "view", jira_ticket ,"--template", "json"], check=True, capture_output=True, text=True).stdout) | |
nr = ticket_info["key"] | |
title = ticket_info["fields"]["summary"] | |
# modify summary / ticket title; | |
# lowercase | |
# space -> '-' | |
# other chars? remove. | |
title = re.sub(r'[^ a-z]+', '', title.lower()).replace(" ","-") | |
branch_name = f"{kind}/{nr}-{title}" | |
print("Creating", branch_name, "from", branch_from) | |
ctx.obj.fetch() | |
ctx.obj.create_worktree(branch_name, branch_from) | |
@main.command() | |
@click.argument("PR_NUMBER") | |
@click.pass_context | |
def checkout_pr(ctx, pr_number): | |
""" Checkout a PR """ | |
branch_name = ctx.obj.branchname_for_pr(pr_number) | |
branch_from = f"origin/{branch_name}" | |
ctx.obj.create_worktree(branch_name, branch_from, existing_branch=True) | |
@dataclass | |
class GitRepo: | |
root: str | |
@classmethod | |
def in_current_dir(cls): | |
root = pathlib.Path(subprocess.run("git rev-parse --git-dir", shell=True, text=True,check=True,capture_output=True).stdout.strip()).absolute() | |
if str(root)[-5:] == "/.git": | |
return GitRepo(root.parent) | |
else: | |
# deal with worktrees... | |
return GitRepo(root.parent.parent.parent) | |
def _git(self, *args): | |
cmd = ["git"] | |
cmd.extend(args) | |
try: | |
return subprocess.run(cwd=self.root, args=cmd, text=True, check=True, capture_output=True).stdout.strip() | |
except subprocess.CalledProcessError as e: | |
raise Exception(f"failed git op: {e.stderr}") | |
def _gh_api(self, *args): | |
cmd = ["gh", "api"] | |
cmd.extend(args) | |
try: | |
return json.loads(subprocess.run(cwd=self.root, args=cmd, text=True, check=True, capture_output=True).stdout.strip()) | |
except subprocess.CalledProcessError as e: | |
raise Exception(f"failed git op: {e.stderr}") | |
def fetch(self, upstream="origin"): | |
self._git("fetch", upstream) | |
def create_worktree(self, branch_name, branch_from, existing_branch = False): | |
self.fetch() | |
self._git("worktree", "prune") | |
path = self.root / "wt" / branch_name.replace("/","-") | |
if existing_branch: | |
self._git("worktree", "add", "--guess-remote", path, branch_name) | |
else: | |
self._git("worktree", "add", "-b", branch_name, path, branch_from) | |
self._git("push", "-u", "origin", branch_name) | |
print(f"Checked out {branch_name} at {path}") | |
def branchname_for_pr(self, pr_number): | |
repo_path= self._gh_repo_path() | |
return self._gh_api(f"/repos/{repo_path}/pulls/{pr_number}")["head"]["ref"] | |
def _gh_repo_path(self): | |
return self._git("remote", "get-url", "origin").strip()[15:-4] | |
@property | |
def worktrees(self): | |
lines = self._git("worktree", "list", "--porcelain").split("\n") | |
worktrees = [] | |
def parse_worktree(): | |
worktree_parts = lines.pop(0).split(" ") | |
assert(len(worktree_parts) == 2) | |
assert(worktree_parts[0] == "worktree") | |
path = pathlib.Path(worktree_parts[1]) | |
attributes = {} | |
while any(lines): | |
line = lines.pop(0) | |
if line == "": | |
break | |
# extract attr key/value | |
parts = line.split(" ", 1) | |
if len(parts) == 1: | |
attributes[parts[0]] = True | |
else: | |
attributes[parts[0]] = parts[1] | |
return Worktree(self, path, attributes) | |
while any(lines): | |
worktrees.append(parse_worktree()) | |
return worktrees | |
@dataclass | |
class Worktree: | |
repo: GitRepo | |
path: str | |
attributes: typing.Dict[str, str] | |
@property | |
def rel_path(self): | |
return self.path.relative_to(self.repo.root) | |
@property | |
def head(self): | |
return self.attributes.get("HEAD") | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment