Skip to content

Instantly share code, notes, and snippets.

@NorthIsUp
Last active September 30, 2023 06:12
Show Gist options
  • Save NorthIsUp/1fa278c2ef8d5d6b3061acb744b1db7f to your computer and use it in GitHub Desktop.
Save NorthIsUp/1fa278c2ef8d5d6b3061acb744b1db7f to your computer and use it in GitHub Desktop.
tilt xbar menu
#!/usr/bin/env python3
# Metadata allows your plugin to show up in the app, and website.
#
# <xbar.title>Tilt Status</xbar.title>
# <xbar.version>v1.0</xbar.version>
# <xbar.author>Adam Hitchcock</xbar.author>
# <xbar.author.github>northisup</xbar.author.github>
# <xbar.desc>Control tilt for your project from the menubar.</xbar.desc>
# <xbar.dependencies>python</xbar.dependencies>
# <xbar.abouturl>https://gist.github.com/NorthIsUp/1fa278c2ef8d5d6b3061acb744b1db7f#file-tilt-status-15s-py</xbar.abouturl>
#
# <xbar.var>boolean(VAR_DEBUG=False): Debug output in the menu.</xbar.var>
# <xbar.var>boolean(VAR_DESCRIBE=True): Describe pods in the menu.</xbar.var>
# <xbar.var>string(VAR_PROJECT_DIR): Where the project is.</xbar.var>
# <xbar.var>string(VAR_TILTFILE=Tiltfile): Tiltfile to use (when inside project dir).</xbar.var>
# <xbar.var>string(VAR_TILT_BIN=/opt/homebrew/bin/tilt): Default tilt binary.</xbar.var>
# <xbar.var>string(VAR_TILT_HOST=Localhost): Host Tilt is running on.</xbar.var>
# <xbar.var>string(VAR_TILT_PORT=10350): Port Tilt is running on.</xbar.var>
from __future__ import annotations
import json
import logging
import re
import shlex
import subprocess
from collections import defaultdict
from contextlib import suppress
from dataclasses import dataclass, field
from enum import Enum
from functools import cached_property
from os.path import expanduser
from pathlib import Path
from shlex import quote
from typing import (
Any,
ClassVar,
Generator,
Iterable,
Optional,
Protocol,
Sequence,
TypeVar,
Union,
get_type_hints,
)
logger = logging.getLogger()
logging.basicConfig(level=logging.DEBUG, format="=====> %(message)s")
BoolNone = Optional[bool]
RenderableGenerator = Generator[str, None, None]
class Renderable(Protocol):
def render(self, depth: int = 0) -> RenderableGenerator:
...
# ---------------------------------------------------------------------------- #
# config #
# ---------------------------------------------------------------------------- #
@dataclass
class Config:
DEBUG: bool = False
DESCRIBE: bool = True
TILTFILE: Path = Path("Tiltfile")
TILT_BIN: Path = Path("/opt/homebrew/bin/tilt")
TILT_HOST: str = "localhost"
TILT_PORT: int = 10350
PROJECT_DIR: Path = Path()
_errors: list[str] = field(init=False, default_factory=list)
_warnings: list[str] = field(init=False, default_factory=list)
def __post_init__(self):
config_path = Path(__file__ + ".vars.json")
if not config_path.exists():
self.warn(f"{config_path.name} is missing, using defaults")
else:
config = json.loads(config_path.read_text())
hints = get_type_hints(Config)
for k in self.__dataclass_fields__:
if k == "errors":
continue
if val := config.get(f"VAR_{k}"):
setattr(self, k, hints[k](val))
elif self.DEBUG:
self.warn(f"{k} is not set, using `{getattr(self, k)}`")
if isinstance(v := getattr(self, k), Path):
setattr(self, k, v := v.expanduser())
if not v.exists():
self.error(f"{k} does not exist at {v}")
if self.DEBUG:
logger.debug(f"{k}: {getattr(self, k)}")
def render(self, depth: int = 0) -> RenderableGenerator:
depth_prefix = f"{'--' * depth}"
for title, color, preifx, messages in (
("errors", "red", "❌", self._errors),
("warnings", "yellow", "⚠️", self._warnings),
):
if messages:
yield from Divider().render(depth)
yield from MenuItem(title, color=color).render(depth)
for msg in sorted(messages):
yield from MenuItem(f"{depth_prefix} {preifx} {msg}").render(depth)
if self.DEBUG:
MenuItem("Vars").with_submenu(
MenuItem(f"{k}: {getattr(self, k)}") for k in self.__dataclass_fields__
).render(depth + 1)
# yield Cmd(f"📝 Edit Vars", f"open '{__file__}.vars.json'", depth=2)
def error(self, msg: str):
self._errors.append(msg)
def warn(self, msg: str):
self._warnings.append(msg)
CONFIG = Config()
# ---------------------------------------------------------------------------- #
# menu classes #
# ---------------------------------------------------------------------------- #
@dataclass
class Menu:
title: str
items: list[Renderable] = field(default_factory=list, init=False)
def render(self) -> Any:
return "\n".join(
(
self.title,
"---",
*self._items(),
*CONFIG.render(),
)
)
def _items(self) -> RenderableGenerator:
for item in self.items:
yield from item.render(depth=0)
def with_items(self, *items: Renderable | Iterable[Renderable]) -> Menu:
return with_something(self, self.items, *items)
@dataclass
class MenuItem:
title: str
key: str = "" # shift+k to add a key shortcut; Use + to create combinations; Example options: CmdOrCtrl, OptionOrAlt, shift, ctrl, super, tab, plus, return, escape, f12, up, down, space
href: str = "" # when clicked, open the url
color: str = (
"" # change the text color. e.g. common colors 'red' and hex colors (#ff0000)
)
font: str = "" # change the text font. eg. font=UbuntuMono-Bold
size: int = 0 # change the text size. eg. size=12
shell: str = "" # make the item run a given script terminal with your script e.g. shell=/Users/user/xbar_Plugins/scripts/nginx.restart.sh if there are spaces in the file path you will need quotes e.g. shell="/Users/user/xbar Plugins/scripts/nginx.restart.sh" (bash is also supported but is deprecated)
params: tuple[str, ...] = () # = to specify arguments to the script
terminal: BoolNone = None # start bash script without opening Terminal
refresh: BoolNone = None # make the item refresh the plugin it belongs to. If the item runs a script, refresh is performed after the script finishes. eg. refresh=true
dropdown: BoolNone = None # If false, the line will only appear and cycle in the status bar but not in the dropdown
length: int = 0 # truncate the line to the specified number of characters. A … will be added to any truncated strings, as well as a tooltip displaying the full string. eg. length=10
trim: BoolNone = None # whether to trim leading/trailing whitespace from the title. true or false (defaults to true)
alternate: BoolNone = None # =true to mark a line as an alternate to the previous one for when the Option key is pressed in the dropdown
templateImage: str = "" # set an image for this item. The image data must be passed as base64 encoded string and should consist of only black and clear pixels. The alpha channel in the image can be used to adjust the opacity of black content, however. This is the recommended way to set an image for the statusbar. Use a 144 DPI resolution to support Retina displays. The imageformat can be any of the formats supported by Mac OS X
image: str = "" # set an image for this item. The image data must be passed as base64 encoded string. Use a 144 DPI resolution to support Retina displays. The imageformat can be any of the formats supported by Mac OS X
emojize: BoolNone = (
None # =false will disable parsing of github style :mushroom: into emoji
)
ansi: BoolNone = None # =false turns off parsing of ANSI codes.
disabled: BoolNone = None # =true greyed out the line and disable click
magic_number: ClassVar[int] = 19 # only use the 19 attrs above here
only_if: bool = True
submenu: list[Renderable] = field(default_factory=list, init=False)
siblings: list[Renderable] = field(default_factory=list, init=False)
@classmethod
def _type_hint(cls, key: str, hints: dict[type, dict[str, type]] = {}):
if cls not in hints:
hints[cls] = get_type_hints(cls, globals())
return hints[cls][key]
@property
def is_divider(self) -> bool:
return self.title == "---"
def depth_prefix(self, depth: int = 0) -> str:
return f"{'--' * depth}{' ' if depth and not self.is_divider else ''}"
def _title(self, depth: int = 0) -> str:
return f"{self.depth_prefix(depth)}{self.title}"
def subclass_render_hook(self) -> Generator[Renderable, None, None]:
yield from ()
def shell_params(self) -> Iterable[str]:
if not self.shell:
return ()
shell, *params = [quote(_) for _ in shlex.split(self.shell)]
return (shell, *params, *self.params)
def menu_params(self) -> Iterable[tuple[str, Any]]:
return (
(k, v)
for k, v in (
(k, getattr(self, k))
for k in list(MenuItem.__dataclass_fields__)[1:19]
if k != "shell"
)
if (self._type_hint(k) == BoolNone and v is not None) or v
)
def all_params(self) -> Iterable[str]:
if shell_params := self.shell_params():
shell, *shell_params = shell_params
yield f"shell={shell}"
yield from (f"param{i}={p}" for i, p in enumerate(shell_params, 1))
yield from (f"{k}={quote(str(v))}" for k, v in self.menu_params())
def render(self, depth: int = 0) -> RenderableGenerator:
if self.only_if:
yield " | ".join((self._title(depth), *self.all_params()))
for item in self.subclass_render_hook():
yield from item.render(depth)
for item in self.submenu:
yield from item.render(depth + 1)
for item in self.siblings:
yield from item.render(depth)
def add_submenu(self, child: MenuItem) -> MenuItem:
self.submenu.append(child)
return self
def with_submenu(self, *children: Renderable | Iterable[Renderable]) -> MenuItem:
return with_something(self, self.submenu, *children)
def with_siblings(self, *children: Renderable | Iterable[Renderable]) -> MenuItem:
return with_something(self, self.siblings, *children)
@dataclass
class Divider(MenuItem):
title: str = "---"
@dataclass
class ShellItem(MenuItem):
cwd: Union[str, Path, None] = None
def __init__(
self,
title: str,
shell: str,
cwd: Union[str, Path, None] = None,
**kwargs: Any,
):
super().__init__(title, shell=shell, **kwargs)
self.cwd = cwd
def __post_init__(self):
if isinstance(self.cwd, str):
self.cwd = Path(self.cwd)
if self.cwd and not self.cwd.exists():
CONFIG.error(f"❌ cwd does not exist at {self.cwd}")
if self.shell and not self.params:
self.shell, *self.params = (quote(_) for _ in shlex.split(self.shell))
def shell_params(self, use_cwd: bool = True) -> Iterable[str]:
shell_params = super().shell_params()
if use_cwd and self.cwd:
shell_params = ("cd", expanduser(self.cwd), "&&", *shell_params)
return shell_params
def shell_str(self, use_cwd: bool = False) -> str:
return " ".join(self.shell_params(use_cwd=use_cwd))
def subclass_render_hook(self, depth: int = 0) -> Generator[Renderable, None, None]:
if CONFIG.DEBUG:
yield MenuItem(
f"╰─ {self.shell_str(use_cwd=False)}", font="Andale Mono", disabled=True
)
def run(self) -> str:
shell_params = list(self.shell_params(use_cwd=False))
if CONFIG.DEBUG:
logger.debug(f"running: {shell_params}")
output = subprocess.check_output(shell_params, cwd=self.cwd)
return output.decode("utf-8").strip()
class TiltShellItem(ShellItem):
def __post_init__(self):
# use the project dir as the cwd for all tilt commands
self.cwd = CONFIG.PROJECT_DIR
return super().__post_init__()
def shell_params(self, use_cwd: bool = True) -> Iterable[str]:
# interpolate `tilt` to use the full path
return (
CONFIG.TILT_BIN.as_posix() if re.match(r"(tilt|.*/tilt)", cmd) else cmd
for cmd in super().shell_params(use_cwd)
)
def shell_str(self, use_cwd: bool = False) -> str:
# don't interpolate `tilt` for prettier output
return " ".join(super().shell_params(use_cwd=use_cwd))
# ---------------------------------------------------------------------------- #
# utility functions #
# ---------------------------------------------------------------------------- #
_no_default = object()
T = TypeVar("T")
def get_in(
keys: str | list[str],
coll: dict[object, object] | list[object],
default: object = _no_default,
) -> Generator[object, None, None]:
try:
key, *keys = keys.split(".") if isinstance(keys, str) else keys
if key == "*":
for i in range(len(coll)):
yield from get_in([i, *keys], coll, default)
elif not keys:
yield coll[key]
else:
yield from get_in(keys, coll[key], default)
except (KeyError, IndexError, TypeError):
if default is not _no_default:
yield default
def strify(joiner: str, *args: Any) -> str:
"""joins strings and filters out None values"""
if len(args) == 1 and not isinstance(args[0], str):
return strify(joiner, *args[0])
return joiner.join(str(arg) for arg in args if arg is not None)
def check_output(cmd: str, cwd: Union[str, Path, None] = None) -> str:
logger.debug(f"running: {cmd}")
return subprocess.check_output(cmd, shell=True, cwd=cwd, encoding="utf-8").strip()
def with_something(
ret: T,
key: list[Renderable],
*children: Renderable | Iterable[Renderable],
) -> T:
for child in children:
if isinstance(child, Iterable):
key.extend(child)
else:
key.append(child)
return ret
# ---------------------------------------------------------------------------- #
# tilt classes #
# ---------------------------------------------------------------------------- #
class Status(str, Enum):
ERROR = "error"
PENDING = "pending"
IN_PROGRESS = "in_progress"
OK = "ok"
UNKNOWN = "unknown"
NOT_APPLICABLE = "not_applicable"
@property
def icon(self) -> str:
return {
Status.OK: "✔",
Status.PENDING: "↺",
Status.IN_PROGRESS: "⌛︎",
Status.UNKNOWN: "?",
Status.NOT_APPLICABLE: "‒",
Status.ERROR: "𝙭",
}[self]
@property
def color(self):
return {
Status.OK: "green",
Status.PENDING: "yellow",
Status.IN_PROGRESS: "green",
Status.ERROR: "red",
Status.UNKNOWN: "yellow",
Status.NOT_APPLICABLE: "white",
}[self]
@classmethod
def debug_menu(cls) -> Iterable[str]:
return (
"-- status icons",
*(f"---- {r.icon} {r.name} | color={r.color}" for r in cls),
)
@dataclass
class Resource:
kind: str = "" # UIResource
metadata: dict[str, object] = field(default_factory=dict)
spec: dict[str, object] = field(default_factory=dict)
status: dict[str, object] = field(default_factory=dict)
kwargs: dict[str, object] = field(default_factory=dict)
name: str = field(init=False)
runtime_status: Status = field(init=False)
update_status: Status = field(init=False)
@classmethod
def description(cls, name: str, _cache: dict[str, str] = {}) -> str:
if CONFIG.DESCRIBE and not _cache:
descriptions = TiltShellItem("", "tilt describe uiresource").run()
for description in descriptions.split("\n\n\n"):
if description.startswith("Name:"):
head = description[: description.index("\n")]
name = head.split(":")[1].strip()
_cache[name] = description.strip()
return _cache.get(name, "")
def description_items(self) -> list[MenuItem]:
return [
MenuItem(t, font="Andale Mono", trim=False)
for t in self.description(self.name).split("\n")
if t
]
def __post_init__(self):
self.name = str(self.metadata.get("name", ""))
self.runtime_status = Status(self.status.get("runtimeStatus", "unknown"))
self.update_status = Status(self.status.get("updateStatus", "unknown"))
@classmethod
def from_status_dict(cls, **kwargs: Any) -> Resource:
return cls(
**{k: kwargs[k] for k in kwargs if k in cls.__dataclass_fields__},
kwargs={k: kwargs[k] for k in kwargs if k not in cls.__dataclass_fields__},
)
@property
def best_status(self) -> Status:
if self.update_status in (Status.OK, Status.UNKNOWN):
return self.runtime_status
return self.update_status
@property
def reason(self) -> str:
if self.update_status is Status.ERROR:
reasons = get_in("buildHistory.*.error", self.status)
return f"errors: {strify(', ', reasons)}"
elif self.runtime_status is Status.ERROR and (
error := self.status.get("error")
):
return f"error: {error}"
elif self.best_status is Status.ERROR and (reason := self.status.get("reason")):
return str(reason)
elif self.best_status is Status.PENDING:
reasons = get_in("waiting.on.*.name", self.status)
msg = next(get_in("waiting.reason", self.status, "waiting")).replace(
"-", " "
)
return f"{msg}: {strify(', ', reasons)}"
return ""
@dataclass
class TiltStatus:
is_running: bool = False
@cached_property
def pid(self) -> Sequence[int]:
try:
pids = check_output("pgrep tilt").split("\n")
logger.debug(f"pids: {pids}")
return [int(pid) for pid in pids if pid]
except subprocess.CalledProcessError:
return ()
@cached_property
def uiresources(self) -> Sequence[Resource]:
with suppress(subprocess.CalledProcessError):
response = check_output(f"{CONFIG.TILT_BIN} get uiresources --output=json")
self.is_running = True
resources = json.loads(response).get("items", [])
return [Resource.from_status_dict(**item) for item in resources]
return []
@cached_property
def resource_status_groups(self) -> dict[Status, Sequence[Resource]]:
groups: dict[str, list[Resource]] = defaultdict(list)
for resource in self.uiresources:
groups[resource.best_status].append(resource)
return {k: groups[k] for k in Status if k in groups}
def title(self) -> MenuItem:
if groups := self.resource_status_groups:
tilt_icon = "◢"
status_rollup = " ".join(f"{s.icon}{len(groups[s])}" for s in groups)
else:
tilt_icon = "🛑"
status_rollup = "Stopped"
return MenuItem(f"{tilt_icon} Tilt {status_rollup}")
def resources_menu(self) -> Generator[Renderable, None, None]:
yield from (
MenuItem(f"{grouping.icon} {grouping}", color=grouping.color).with_siblings(
MenuItem(
f"{r.name}",
href=f"http://{CONFIG.TILT_HOST}:{CONFIG.TILT_PORT}/r/{r.name}/overview",
)
.with_submenu(
MenuItem(r.reason, disabled=len(r.reason) > 10),
ShellItem("🔄 refresh", f"tilt trigger '{r.name}'", terminal=False),
ShellItem(
"🪵 logs",
f"tilt logs --follow --resource '{r.name}'",
terminal=True,
),
ShellItem("🗑️ delete", f"tilt delete {r.kind} {r.name}"),
Divider(),
*r.description_items(),
)
.with_siblings(
MenuItem(
f"╰─ {r.reason}",
disabled=True,
length=40,
only_if=bool(r.reason) and CONFIG.DEBUG,
)
)
for r in sorted(group, key=lambda r: r.name)
)
for grouping, group in self.resource_status_groups.items()
)
def commands_menu(self) -> Sequence[Renderable]:
tilt_up = TiltShellItem("⬆️ up", "tilt up")
return (
Divider(),
MenuItem("🧙‍♂️ commands").with_submenu(
ShellItem(
"🌅 start-my-day",
f"open -a docker && ./scripts/start_my_day.sh && {tilt_up.shell_str()}",
cwd=CONFIG.PROJECT_DIR,
terminal=True,
),
tilt_up,
TiltShellItem("⬇️ down", "tilt down"),
TiltShellItem("🪵 all logs", "tilt logs --follow"),
Divider(),
ShellItem(
"🧨 stop all", "killall tilt", disabled=not self.pid, color="pink"
),
),
Divider(),
*(
ShellItem(
f"🧨 kill {pid}: {check_output(f'ps -p {pid} -o command=').strip()}",
f"kill {pid}",
color="pink",
)
for pid in self.pid
),
)
def main():
ts = TiltStatus()
m = Menu(ts.title().title).with_items(
MenuItem(
"👁️ overview",
href=f"http://{CONFIG.TILT_HOST}:{CONFIG.TILT_PORT}/overview",
only_if=ts.is_running,
),
*ts.commands_menu(),
*ts.resources_menu(),
)
print(m.render())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment