Last active
September 30, 2023 06:12
-
-
Save NorthIsUp/1fa278c2ef8d5d6b3061acb744b1db7f to your computer and use it in GitHub Desktop.
tilt xbar menu
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Metadata allows your plugin to show up in the app, and website. | |
# | |
# <xbar.title>Tilt Status</xbar.title> | |
# <xbar.version>v1.0</xbar.version> | |
# <xbar.author>Adam Hitchcock</xbar.author> | |
# <xbar.author.github>northisup</xbar.author.github> | |
# <xbar.desc>Control tilt for your project from the menubar.</xbar.desc> | |
# <xbar.dependencies>python</xbar.dependencies> | |
# <xbar.abouturl>https://gist.github.com/NorthIsUp/1fa278c2ef8d5d6b3061acb744b1db7f#file-tilt-status-15s-py</xbar.abouturl> | |
# | |
# <xbar.var>boolean(VAR_DEBUG=False): Debug output in the menu.</xbar.var> | |
# <xbar.var>boolean(VAR_DESCRIBE=True): Describe pods in the menu.</xbar.var> | |
# <xbar.var>string(VAR_PROJECT_DIR): Where the project is.</xbar.var> | |
# <xbar.var>string(VAR_TILTFILE=Tiltfile): Tiltfile to use (when inside project dir).</xbar.var> | |
# <xbar.var>string(VAR_TILT_BIN=/opt/homebrew/bin/tilt): Default tilt binary.</xbar.var> | |
# <xbar.var>string(VAR_TILT_HOST=Localhost): Host Tilt is running on.</xbar.var> | |
# <xbar.var>string(VAR_TILT_PORT=10350): Port Tilt is running on.</xbar.var> | |
from __future__ import annotations | |
import json | |
import logging | |
import re | |
import shlex | |
import subprocess | |
from collections import defaultdict | |
from contextlib import suppress | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from functools import cached_property | |
from os.path import expanduser | |
from pathlib import Path | |
from shlex import quote | |
from typing import ( | |
Any, | |
ClassVar, | |
Generator, | |
Iterable, | |
Optional, | |
Protocol, | |
Sequence, | |
TypeVar, | |
Union, | |
get_type_hints, | |
) | |
logger = logging.getLogger() | |
logging.basicConfig(level=logging.DEBUG, format="=====> %(message)s") | |
BoolNone = Optional[bool] | |
RenderableGenerator = Generator[str, None, None] | |
class Renderable(Protocol): | |
def render(self, depth: int = 0) -> RenderableGenerator: | |
... | |
# ---------------------------------------------------------------------------- # | |
# config # | |
# ---------------------------------------------------------------------------- # | |
@dataclass | |
class Config: | |
DEBUG: bool = False | |
DESCRIBE: bool = True | |
TILTFILE: Path = Path("Tiltfile") | |
TILT_BIN: Path = Path("/opt/homebrew/bin/tilt") | |
TILT_HOST: str = "localhost" | |
TILT_PORT: int = 10350 | |
PROJECT_DIR: Path = Path() | |
_errors: list[str] = field(init=False, default_factory=list) | |
_warnings: list[str] = field(init=False, default_factory=list) | |
def __post_init__(self): | |
config_path = Path(__file__ + ".vars.json") | |
if not config_path.exists(): | |
self.warn(f"{config_path.name} is missing, using defaults") | |
else: | |
config = json.loads(config_path.read_text()) | |
hints = get_type_hints(Config) | |
for k in self.__dataclass_fields__: | |
if k == "errors": | |
continue | |
if val := config.get(f"VAR_{k}"): | |
setattr(self, k, hints[k](val)) | |
elif self.DEBUG: | |
self.warn(f"{k} is not set, using `{getattr(self, k)}`") | |
if isinstance(v := getattr(self, k), Path): | |
setattr(self, k, v := v.expanduser()) | |
if not v.exists(): | |
self.error(f"{k} does not exist at {v}") | |
if self.DEBUG: | |
logger.debug(f"{k}: {getattr(self, k)}") | |
def render(self, depth: int = 0) -> RenderableGenerator: | |
depth_prefix = f"{'--' * depth}" | |
for title, color, preifx, messages in ( | |
("errors", "red", "❌", self._errors), | |
("warnings", "yellow", "⚠️", self._warnings), | |
): | |
if messages: | |
yield from Divider().render(depth) | |
yield from MenuItem(title, color=color).render(depth) | |
for msg in sorted(messages): | |
yield from MenuItem(f"{depth_prefix} {preifx} {msg}").render(depth) | |
if self.DEBUG: | |
MenuItem("Vars").with_submenu( | |
MenuItem(f"{k}: {getattr(self, k)}") for k in self.__dataclass_fields__ | |
).render(depth + 1) | |
# yield Cmd(f"📝 Edit Vars", f"open '{__file__}.vars.json'", depth=2) | |
def error(self, msg: str): | |
self._errors.append(msg) | |
def warn(self, msg: str): | |
self._warnings.append(msg) | |
CONFIG = Config() | |
# ---------------------------------------------------------------------------- # | |
# menu classes # | |
# ---------------------------------------------------------------------------- # | |
@dataclass | |
class Menu: | |
title: str | |
items: list[Renderable] = field(default_factory=list, init=False) | |
def render(self) -> Any: | |
return "\n".join( | |
( | |
self.title, | |
"---", | |
*self._items(), | |
*CONFIG.render(), | |
) | |
) | |
def _items(self) -> RenderableGenerator: | |
for item in self.items: | |
yield from item.render(depth=0) | |
def with_items(self, *items: Renderable | Iterable[Renderable]) -> Menu: | |
return with_something(self, self.items, *items) | |
@dataclass | |
class MenuItem: | |
title: str | |
key: str = "" # shift+k to add a key shortcut; Use + to create combinations; Example options: CmdOrCtrl, OptionOrAlt, shift, ctrl, super, tab, plus, return, escape, f12, up, down, space | |
href: str = "" # when clicked, open the url | |
color: str = ( | |
"" # change the text color. e.g. common colors 'red' and hex colors (#ff0000) | |
) | |
font: str = "" # change the text font. eg. font=UbuntuMono-Bold | |
size: int = 0 # change the text size. eg. size=12 | |
shell: str = "" # make the item run a given script terminal with your script e.g. shell=/Users/user/xbar_Plugins/scripts/nginx.restart.sh if there are spaces in the file path you will need quotes e.g. shell="/Users/user/xbar Plugins/scripts/nginx.restart.sh" (bash is also supported but is deprecated) | |
params: tuple[str, ...] = () # = to specify arguments to the script | |
terminal: BoolNone = None # start bash script without opening Terminal | |
refresh: BoolNone = None # make the item refresh the plugin it belongs to. If the item runs a script, refresh is performed after the script finishes. eg. refresh=true | |
dropdown: BoolNone = None # If false, the line will only appear and cycle in the status bar but not in the dropdown | |
length: int = 0 # truncate the line to the specified number of characters. A … will be added to any truncated strings, as well as a tooltip displaying the full string. eg. length=10 | |
trim: BoolNone = None # whether to trim leading/trailing whitespace from the title. true or false (defaults to true) | |
alternate: BoolNone = None # =true to mark a line as an alternate to the previous one for when the Option key is pressed in the dropdown | |
templateImage: str = "" # set an image for this item. The image data must be passed as base64 encoded string and should consist of only black and clear pixels. The alpha channel in the image can be used to adjust the opacity of black content, however. This is the recommended way to set an image for the statusbar. Use a 144 DPI resolution to support Retina displays. The imageformat can be any of the formats supported by Mac OS X | |
image: str = "" # set an image for this item. The image data must be passed as base64 encoded string. Use a 144 DPI resolution to support Retina displays. The imageformat can be any of the formats supported by Mac OS X | |
emojize: BoolNone = ( | |
None # =false will disable parsing of github style :mushroom: into emoji | |
) | |
ansi: BoolNone = None # =false turns off parsing of ANSI codes. | |
disabled: BoolNone = None # =true greyed out the line and disable click | |
magic_number: ClassVar[int] = 19 # only use the 19 attrs above here | |
only_if: bool = True | |
submenu: list[Renderable] = field(default_factory=list, init=False) | |
siblings: list[Renderable] = field(default_factory=list, init=False) | |
@classmethod | |
def _type_hint(cls, key: str, hints: dict[type, dict[str, type]] = {}): | |
if cls not in hints: | |
hints[cls] = get_type_hints(cls, globals()) | |
return hints[cls][key] | |
@property | |
def is_divider(self) -> bool: | |
return self.title == "---" | |
def depth_prefix(self, depth: int = 0) -> str: | |
return f"{'--' * depth}{' ' if depth and not self.is_divider else ''}" | |
def _title(self, depth: int = 0) -> str: | |
return f"{self.depth_prefix(depth)}{self.title}" | |
def subclass_render_hook(self) -> Generator[Renderable, None, None]: | |
yield from () | |
def shell_params(self) -> Iterable[str]: | |
if not self.shell: | |
return () | |
shell, *params = [quote(_) for _ in shlex.split(self.shell)] | |
return (shell, *params, *self.params) | |
def menu_params(self) -> Iterable[tuple[str, Any]]: | |
return ( | |
(k, v) | |
for k, v in ( | |
(k, getattr(self, k)) | |
for k in list(MenuItem.__dataclass_fields__)[1:19] | |
if k != "shell" | |
) | |
if (self._type_hint(k) == BoolNone and v is not None) or v | |
) | |
def all_params(self) -> Iterable[str]: | |
if shell_params := self.shell_params(): | |
shell, *shell_params = shell_params | |
yield f"shell={shell}" | |
yield from (f"param{i}={p}" for i, p in enumerate(shell_params, 1)) | |
yield from (f"{k}={quote(str(v))}" for k, v in self.menu_params()) | |
def render(self, depth: int = 0) -> RenderableGenerator: | |
if self.only_if: | |
yield " | ".join((self._title(depth), *self.all_params())) | |
for item in self.subclass_render_hook(): | |
yield from item.render(depth) | |
for item in self.submenu: | |
yield from item.render(depth + 1) | |
for item in self.siblings: | |
yield from item.render(depth) | |
def add_submenu(self, child: MenuItem) -> MenuItem: | |
self.submenu.append(child) | |
return self | |
def with_submenu(self, *children: Renderable | Iterable[Renderable]) -> MenuItem: | |
return with_something(self, self.submenu, *children) | |
def with_siblings(self, *children: Renderable | Iterable[Renderable]) -> MenuItem: | |
return with_something(self, self.siblings, *children) | |
@dataclass | |
class Divider(MenuItem): | |
title: str = "---" | |
@dataclass | |
class ShellItem(MenuItem): | |
cwd: Union[str, Path, None] = None | |
def __init__( | |
self, | |
title: str, | |
shell: str, | |
cwd: Union[str, Path, None] = None, | |
**kwargs: Any, | |
): | |
super().__init__(title, shell=shell, **kwargs) | |
self.cwd = cwd | |
def __post_init__(self): | |
if isinstance(self.cwd, str): | |
self.cwd = Path(self.cwd) | |
if self.cwd and not self.cwd.exists(): | |
CONFIG.error(f"❌ cwd does not exist at {self.cwd}") | |
if self.shell and not self.params: | |
self.shell, *self.params = (quote(_) for _ in shlex.split(self.shell)) | |
def shell_params(self, use_cwd: bool = True) -> Iterable[str]: | |
shell_params = super().shell_params() | |
if use_cwd and self.cwd: | |
shell_params = ("cd", expanduser(self.cwd), "&&", *shell_params) | |
return shell_params | |
def shell_str(self, use_cwd: bool = False) -> str: | |
return " ".join(self.shell_params(use_cwd=use_cwd)) | |
def subclass_render_hook(self, depth: int = 0) -> Generator[Renderable, None, None]: | |
if CONFIG.DEBUG: | |
yield MenuItem( | |
f"╰─ {self.shell_str(use_cwd=False)}", font="Andale Mono", disabled=True | |
) | |
def run(self) -> str: | |
shell_params = list(self.shell_params(use_cwd=False)) | |
if CONFIG.DEBUG: | |
logger.debug(f"running: {shell_params}") | |
output = subprocess.check_output(shell_params, cwd=self.cwd) | |
return output.decode("utf-8").strip() | |
class TiltShellItem(ShellItem): | |
def __post_init__(self): | |
# use the project dir as the cwd for all tilt commands | |
self.cwd = CONFIG.PROJECT_DIR | |
return super().__post_init__() | |
def shell_params(self, use_cwd: bool = True) -> Iterable[str]: | |
# interpolate `tilt` to use the full path | |
return ( | |
CONFIG.TILT_BIN.as_posix() if re.match(r"(tilt|.*/tilt)", cmd) else cmd | |
for cmd in super().shell_params(use_cwd) | |
) | |
def shell_str(self, use_cwd: bool = False) -> str: | |
# don't interpolate `tilt` for prettier output | |
return " ".join(super().shell_params(use_cwd=use_cwd)) | |
# ---------------------------------------------------------------------------- # | |
# utility functions # | |
# ---------------------------------------------------------------------------- # | |
_no_default = object() | |
T = TypeVar("T") | |
def get_in( | |
keys: str | list[str], | |
coll: dict[object, object] | list[object], | |
default: object = _no_default, | |
) -> Generator[object, None, None]: | |
try: | |
key, *keys = keys.split(".") if isinstance(keys, str) else keys | |
if key == "*": | |
for i in range(len(coll)): | |
yield from get_in([i, *keys], coll, default) | |
elif not keys: | |
yield coll[key] | |
else: | |
yield from get_in(keys, coll[key], default) | |
except (KeyError, IndexError, TypeError): | |
if default is not _no_default: | |
yield default | |
def strify(joiner: str, *args: Any) -> str: | |
"""joins strings and filters out None values""" | |
if len(args) == 1 and not isinstance(args[0], str): | |
return strify(joiner, *args[0]) | |
return joiner.join(str(arg) for arg in args if arg is not None) | |
def check_output(cmd: str, cwd: Union[str, Path, None] = None) -> str: | |
logger.debug(f"running: {cmd}") | |
return subprocess.check_output(cmd, shell=True, cwd=cwd, encoding="utf-8").strip() | |
def with_something( | |
ret: T, | |
key: list[Renderable], | |
*children: Renderable | Iterable[Renderable], | |
) -> T: | |
for child in children: | |
if isinstance(child, Iterable): | |
key.extend(child) | |
else: | |
key.append(child) | |
return ret | |
# ---------------------------------------------------------------------------- # | |
# tilt classes # | |
# ---------------------------------------------------------------------------- # | |
class Status(str, Enum): | |
ERROR = "error" | |
PENDING = "pending" | |
IN_PROGRESS = "in_progress" | |
OK = "ok" | |
UNKNOWN = "unknown" | |
NOT_APPLICABLE = "not_applicable" | |
@property | |
def icon(self) -> str: | |
return { | |
Status.OK: "✔", | |
Status.PENDING: "↺", | |
Status.IN_PROGRESS: "⌛︎", | |
Status.UNKNOWN: "?", | |
Status.NOT_APPLICABLE: "‒", | |
Status.ERROR: "𝙭", | |
}[self] | |
@property | |
def color(self): | |
return { | |
Status.OK: "green", | |
Status.PENDING: "yellow", | |
Status.IN_PROGRESS: "green", | |
Status.ERROR: "red", | |
Status.UNKNOWN: "yellow", | |
Status.NOT_APPLICABLE: "white", | |
}[self] | |
@classmethod | |
def debug_menu(cls) -> Iterable[str]: | |
return ( | |
"-- status icons", | |
*(f"---- {r.icon} {r.name} | color={r.color}" for r in cls), | |
) | |
@dataclass | |
class Resource: | |
kind: str = "" # UIResource | |
metadata: dict[str, object] = field(default_factory=dict) | |
spec: dict[str, object] = field(default_factory=dict) | |
status: dict[str, object] = field(default_factory=dict) | |
kwargs: dict[str, object] = field(default_factory=dict) | |
name: str = field(init=False) | |
runtime_status: Status = field(init=False) | |
update_status: Status = field(init=False) | |
@classmethod | |
def description(cls, name: str, _cache: dict[str, str] = {}) -> str: | |
if CONFIG.DESCRIBE and not _cache: | |
descriptions = TiltShellItem("", "tilt describe uiresource").run() | |
for description in descriptions.split("\n\n\n"): | |
if description.startswith("Name:"): | |
head = description[: description.index("\n")] | |
name = head.split(":")[1].strip() | |
_cache[name] = description.strip() | |
return _cache.get(name, "") | |
def description_items(self) -> list[MenuItem]: | |
return [ | |
MenuItem(t, font="Andale Mono", trim=False) | |
for t in self.description(self.name).split("\n") | |
if t | |
] | |
def __post_init__(self): | |
self.name = str(self.metadata.get("name", "")) | |
self.runtime_status = Status(self.status.get("runtimeStatus", "unknown")) | |
self.update_status = Status(self.status.get("updateStatus", "unknown")) | |
@classmethod | |
def from_status_dict(cls, **kwargs: Any) -> Resource: | |
return cls( | |
**{k: kwargs[k] for k in kwargs if k in cls.__dataclass_fields__}, | |
kwargs={k: kwargs[k] for k in kwargs if k not in cls.__dataclass_fields__}, | |
) | |
@property | |
def best_status(self) -> Status: | |
if self.update_status in (Status.OK, Status.UNKNOWN): | |
return self.runtime_status | |
return self.update_status | |
@property | |
def reason(self) -> str: | |
if self.update_status is Status.ERROR: | |
reasons = get_in("buildHistory.*.error", self.status) | |
return f"errors: {strify(', ', reasons)}" | |
elif self.runtime_status is Status.ERROR and ( | |
error := self.status.get("error") | |
): | |
return f"error: {error}" | |
elif self.best_status is Status.ERROR and (reason := self.status.get("reason")): | |
return str(reason) | |
elif self.best_status is Status.PENDING: | |
reasons = get_in("waiting.on.*.name", self.status) | |
msg = next(get_in("waiting.reason", self.status, "waiting")).replace( | |
"-", " " | |
) | |
return f"{msg}: {strify(', ', reasons)}" | |
return "" | |
@dataclass | |
class TiltStatus: | |
is_running: bool = False | |
@cached_property | |
def pid(self) -> Sequence[int]: | |
try: | |
pids = check_output("pgrep tilt").split("\n") | |
logger.debug(f"pids: {pids}") | |
return [int(pid) for pid in pids if pid] | |
except subprocess.CalledProcessError: | |
return () | |
@cached_property | |
def uiresources(self) -> Sequence[Resource]: | |
with suppress(subprocess.CalledProcessError): | |
response = check_output(f"{CONFIG.TILT_BIN} get uiresources --output=json") | |
self.is_running = True | |
resources = json.loads(response).get("items", []) | |
return [Resource.from_status_dict(**item) for item in resources] | |
return [] | |
@cached_property | |
def resource_status_groups(self) -> dict[Status, Sequence[Resource]]: | |
groups: dict[str, list[Resource]] = defaultdict(list) | |
for resource in self.uiresources: | |
groups[resource.best_status].append(resource) | |
return {k: groups[k] for k in Status if k in groups} | |
def title(self) -> MenuItem: | |
if groups := self.resource_status_groups: | |
tilt_icon = "◢" | |
status_rollup = " ".join(f"{s.icon}{len(groups[s])}" for s in groups) | |
else: | |
tilt_icon = "🛑" | |
status_rollup = "Stopped" | |
return MenuItem(f"{tilt_icon} Tilt {status_rollup}") | |
def resources_menu(self) -> Generator[Renderable, None, None]: | |
yield from ( | |
MenuItem(f"{grouping.icon} {grouping}", color=grouping.color).with_siblings( | |
MenuItem( | |
f"{r.name}", | |
href=f"http://{CONFIG.TILT_HOST}:{CONFIG.TILT_PORT}/r/{r.name}/overview", | |
) | |
.with_submenu( | |
MenuItem(r.reason, disabled=len(r.reason) > 10), | |
ShellItem("🔄 refresh", f"tilt trigger '{r.name}'", terminal=False), | |
ShellItem( | |
"🪵 logs", | |
f"tilt logs --follow --resource '{r.name}'", | |
terminal=True, | |
), | |
ShellItem("🗑️ delete", f"tilt delete {r.kind} {r.name}"), | |
Divider(), | |
*r.description_items(), | |
) | |
.with_siblings( | |
MenuItem( | |
f"╰─ {r.reason}", | |
disabled=True, | |
length=40, | |
only_if=bool(r.reason) and CONFIG.DEBUG, | |
) | |
) | |
for r in sorted(group, key=lambda r: r.name) | |
) | |
for grouping, group in self.resource_status_groups.items() | |
) | |
def commands_menu(self) -> Sequence[Renderable]: | |
tilt_up = TiltShellItem("⬆️ up", "tilt up") | |
return ( | |
Divider(), | |
MenuItem("🧙♂️ commands").with_submenu( | |
ShellItem( | |
"🌅 start-my-day", | |
f"open -a docker && ./scripts/start_my_day.sh && {tilt_up.shell_str()}", | |
cwd=CONFIG.PROJECT_DIR, | |
terminal=True, | |
), | |
tilt_up, | |
TiltShellItem("⬇️ down", "tilt down"), | |
TiltShellItem("🪵 all logs", "tilt logs --follow"), | |
Divider(), | |
ShellItem( | |
"🧨 stop all", "killall tilt", disabled=not self.pid, color="pink" | |
), | |
), | |
Divider(), | |
*( | |
ShellItem( | |
f"🧨 kill {pid}: {check_output(f'ps -p {pid} -o command=').strip()}", | |
f"kill {pid}", | |
color="pink", | |
) | |
for pid in self.pid | |
), | |
) | |
def main(): | |
ts = TiltStatus() | |
m = Menu(ts.title().title).with_items( | |
MenuItem( | |
"👁️ overview", | |
href=f"http://{CONFIG.TILT_HOST}:{CONFIG.TILT_PORT}/overview", | |
only_if=ts.is_running, | |
), | |
*ts.commands_menu(), | |
*ts.resources_menu(), | |
) | |
print(m.render()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment