Skip to content

Instantly share code, notes, and snippets.

@pkubik
Last active July 1, 2022 17:48
Show Gist options
  • Save pkubik/dfdd6546e142bd4dbe0a98ac2ab2c564 to your computer and use it in GitHub Desktop.
Save pkubik/dfdd6546e142bd4dbe0a98ac2ab2c564 to your computer and use it in GitHub Desktop.
K8s job control with fzf frontend and HTTP flask backend
from __future__ import annotations
from flask import Flask
from flask import request
from flask import abort
from multiprocessing import Process
import subprocess as sp
import sys
import os
import json
from dataclasses import dataclass, field
LINE_FORMAT = "{name:<56} {pods:>16} {status:>12} {priority:>12} {node}"
HEADER = (
LINE_FORMAT.format(
name="NAME", pods="PODS", status="STATUS", priority="PRIORITY", node="NODE"
)
+ "\nPress F1 for help."
)
USER = os.environ.get("USER") or os.environ.get("USERNAME")
# counter is just to show some state
HELP = """
Type to search.
Use arrows to select an entry.
Use tab (or shift+tab) to select a pod within the job.
Press enter to tail logs from the pod.
List of commands:
- CTRL+r: update the list
- CTRL+t: terminate job
- CTRL+x: enter shell on pod
- CTRL+s: describe pod
- CTRL+e: describe job
(press q to exit this screen)
"""
@dataclass
class Pod:
name: str
node: str
status: str = ""
@dataclass
class Job:
name: str
counter: int = 0
priority: str = ""
selected_pod: str = ""
pods: dict[str, Pod] = field(default_factory=dict)
@property
def pod(self):
return self.pods[self.selected_pod]
def is_just_pod(self):
return self.selected_pod == self.name
def pods_repr(self):
if self.is_just_pod():
return "-"
reprs = []
for pod in self.pods.values():
if pod.name == self.selected_pod:
reprs.append("#")
else:
reprs.append("-")
suffix = self.selected_pod.split("-")[-1]
return "".join(reprs) + " " + suffix
def advance_pod(self, offset=1):
if self.is_just_pod() or len(self.pods) == 0:
return
pods = list(self.pods)
try:
index = pods.index(self.selected_pod)
self.selected_pod = pods[(index + offset) % len(pods)]
except:
self.selected_pod = pods[0]
@dataclass
class K8s:
db: dict[str, Job]
entries: dict[str, str] = field(default_factory=dict)
exit_action: str = ""
def __post_init__(self):
self.update_entries()
def update_entries(self):
self.entries = {
LINE_FORMAT.format(
name=j.name,
pods=j.pods_repr(),
status=j.pod.status,
priority=j.priority,
node=j.pod.node,
): j.name
for j in self.db.values()
}
def update(self):
pods_json = sp.check_output(["kubectl", "get", "pods", "-o", "json"])
raw_pods = json.loads(pods_json)["items"]
new_db = {}
for raw_pod in raw_pods:
metadata = raw_pod["metadata"]
spec = raw_pod["spec"]
status = raw_pod["status"]
pod_name = metadata["name"]
job_name = metadata.get("labels", {}).get("job-name")
if not job_name:
job_name = pod_name
pod = Pod(pod_name, node=spec["nodeName"], status=status["phase"])
job = self.db.get(job_name) or new_db.get(job_name) or Job(
job_name,
priority=spec["priorityClassName"],
selected_pod=pod_name,
pods={pod_name: pod},
)
new_db[job_name] = job
job.pods[pod_name] = pod
self.db = new_db
self.update_entries()
def decode_entry(self, encoded_entry):
return k8s.entries[encoded_entry.strip().strip("'")]
def by_entry(self, entry):
return k8s.db[self.decode_entry(entry)]
def print(self):
return "\n".join(k8s.entries)
k8s = K8s({})
k8s.update()
app = Flask(__name__)
def get_arg(name):
entry = request.args.get(name)
if entry is None:
abort(404)
return entry
@app.route("/inc")
def inc():
entry = get_arg("entry")
k8s.by_entry(entry).advance_pod()
k8s.update_entries()
return k8s.print()
@app.route("/dec")
def dec():
entry = get_arg("entry")
k8s.by_entry(entry).advance_pod(-1)
k8s.update_entries()
return k8s.print()
@app.route("/set-action")
def set_action():
action = get_arg("action")
k8s.exit_action = action
return ""
@app.route("/action")
def exit_action():
return k8s.exit_action
@app.route("/job")
def decode_job():
entry = get_arg("entry")
return k8s.decode_entry(entry)
@app.route("/pod")
def decode_pod():
entry = get_arg("entry")
return k8s.by_entry(entry).selected_pod
@app.route("/kill")
def kill():
entry = get_arg("entry")
job = k8s.by_entry(entry)
for pod in job.pods.values():
pod.status = "dying"
sp.run(
f"kubectl delete job {job.name}",
shell=True,
stdout=sp.DEVNULL,
stderr=sp.DEVNULL,
)
k8s.update_entries()
return k8s.print()
@app.route("/info")
def info():
entry = get_arg("entry")
job = k8s.by_entry(entry)
return str(job)
@app.route("/refresh")
def refresh():
k8s.update()
return k8s.print()
@app.route("/")
def base():
return k8s.print()
def run_server():
sys.stderr = open("/tmp/nihao-server-log.out", "w")
sys.stdout = open("/tmp/nihao-server-log.out", "a")
print(f"Starting server at PID {os.getpid()}")
app.run()
server = Process(target=run_server)
server.start()
def build_command(name="", add_entry=True, action=""):
if name:
name = f"/{name}"
entry_substr = " "
if add_entry:
entry_substr += ' --data-urlencode "entry={}" '
elif action:
entry_substr += f' --data-urlencode "action={action}" '
return f"curl --get localhost:5000{name}{entry_substr}--fail 2>/dev/null"
GET_CMD = build_command(add_entry=False)
def create_binding(keys, action="", add_entry=True, final=False):
if final:
return [
"--bind",
f"{keys}:execute({build_command('set-action', add_entry=False, action=action)})+accept",
]
else:
return [
"--bind",
f"{keys}:reload({build_command(action, add_entry=add_entry)} || {GET_CMD})",
]
FZF_CMD = ["fzf"]
if USER:
FZF_CMD += ["-q", f".{USER}."]
FZF_CMD += ["+s", "-n", "1", "--ansi", "--color"]
FZF_CMD += create_binding("ctrl-r", "refresh", add_entry=False)
FZF_CMD += create_binding("tab", "inc")
FZF_CMD += create_binding("btab", "dec")
FZF_CMD += create_binding("ctrl-t", "kill")
FZF_CMD += create_binding("ctrl-x", "exec", final=True)
FZF_CMD += create_binding("ctrl-s", "describe", final=True)
FZF_CMD += create_binding("ctrl-e", "describe-job", final=True)
FZF_CMD += create_binding("enter", "logs", final=True)
FZF_CMD += create_binding("f1", "help", final=True)
FZF_CMD += ["--header", HEADER]
fzf = sp.run(
FZF_CMD,
stdout=sp.PIPE,
input=base(),
encoding="UTF-8",
)
if fzf.returncode == 0:
action = sp.check_output(
build_command("action").format(fzf.stdout), shell=True
).decode()
job_name = sp.check_output(
build_command("job").format(fzf.stdout), shell=True
).decode()
pod_name = sp.check_output(
build_command("pod").format(fzf.stdout), shell=True
).decode()
server.terminate()
try:
if not action:
pass
elif action == "logs":
sp.run(["kubectl", "logs", "-f", pod_name])
elif action == "exec":
sp.run(["kubectl", "exec", "-it", pod_name, "--", "bash"])
elif action == "describe":
sp.run(["bash", "-c", f"kubectl describe pod {pod_name} | less"])
elif action == "describe-job":
sp.run(["bash", "-c", f"kubectl describe job {job_name} | less"])
elif action == "help":
sp.run(["bash", "-c", f"echo '{HELP}' | less"])
else:
pass
except KeyboardInterrupt:
pass
else:
server.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment