Last active
July 1, 2022 17:48
-
-
Save pkubik/dfdd6546e142bd4dbe0a98ac2ab2c564 to your computer and use it in GitHub Desktop.
K8s job control with fzf frontend and HTTP flask backend
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from flask import Flask | |
from flask import request | |
from flask import abort | |
from multiprocessing import Process | |
import subprocess as sp | |
import sys | |
import os | |
import json | |
from dataclasses import dataclass, field | |
LINE_FORMAT = "{name:<56} {pods:>16} {status:>12} {priority:>12} {node}" | |
HEADER = ( | |
LINE_FORMAT.format( | |
name="NAME", pods="PODS", status="STATUS", priority="PRIORITY", node="NODE" | |
) | |
+ "\nPress F1 for help." | |
) | |
USER = os.environ.get("USER") or os.environ.get("USERNAME") | |
# counter is just to show some state | |
HELP = """ | |
Type to search. | |
Use arrows to select an entry. | |
Use tab (or shift+tab) to select a pod within the job. | |
Press enter to tail logs from the pod. | |
List of commands: | |
- CTRL+r: update the list | |
- CTRL+t: terminate job | |
- CTRL+x: enter shell on pod | |
- CTRL+s: describe pod | |
- CTRL+e: describe job | |
(press q to exit this screen) | |
""" | |
@dataclass | |
class Pod: | |
name: str | |
node: str | |
status: str = "" | |
@dataclass | |
class Job: | |
name: str | |
counter: int = 0 | |
priority: str = "" | |
selected_pod: str = "" | |
pods: dict[str, Pod] = field(default_factory=dict) | |
@property | |
def pod(self): | |
return self.pods[self.selected_pod] | |
def is_just_pod(self): | |
return self.selected_pod == self.name | |
def pods_repr(self): | |
if self.is_just_pod(): | |
return "-" | |
reprs = [] | |
for pod in self.pods.values(): | |
if pod.name == self.selected_pod: | |
reprs.append("#") | |
else: | |
reprs.append("-") | |
suffix = self.selected_pod.split("-")[-1] | |
return "".join(reprs) + " " + suffix | |
def advance_pod(self, offset=1): | |
if self.is_just_pod() or len(self.pods) == 0: | |
return | |
pods = list(self.pods) | |
try: | |
index = pods.index(self.selected_pod) | |
self.selected_pod = pods[(index + offset) % len(pods)] | |
except: | |
self.selected_pod = pods[0] | |
@dataclass | |
class K8s: | |
db: dict[str, Job] | |
entries: dict[str, str] = field(default_factory=dict) | |
exit_action: str = "" | |
def __post_init__(self): | |
self.update_entries() | |
def update_entries(self): | |
self.entries = { | |
LINE_FORMAT.format( | |
name=j.name, | |
pods=j.pods_repr(), | |
status=j.pod.status, | |
priority=j.priority, | |
node=j.pod.node, | |
): j.name | |
for j in self.db.values() | |
} | |
def update(self): | |
pods_json = sp.check_output(["kubectl", "get", "pods", "-o", "json"]) | |
raw_pods = json.loads(pods_json)["items"] | |
new_db = {} | |
for raw_pod in raw_pods: | |
metadata = raw_pod["metadata"] | |
spec = raw_pod["spec"] | |
status = raw_pod["status"] | |
pod_name = metadata["name"] | |
job_name = metadata.get("labels", {}).get("job-name") | |
if not job_name: | |
job_name = pod_name | |
pod = Pod(pod_name, node=spec["nodeName"], status=status["phase"]) | |
job = self.db.get(job_name) or new_db.get(job_name) or Job( | |
job_name, | |
priority=spec["priorityClassName"], | |
selected_pod=pod_name, | |
pods={pod_name: pod}, | |
) | |
new_db[job_name] = job | |
job.pods[pod_name] = pod | |
self.db = new_db | |
self.update_entries() | |
def decode_entry(self, encoded_entry): | |
return k8s.entries[encoded_entry.strip().strip("'")] | |
def by_entry(self, entry): | |
return k8s.db[self.decode_entry(entry)] | |
def print(self): | |
return "\n".join(k8s.entries) | |
k8s = K8s({}) | |
k8s.update() | |
app = Flask(__name__) | |
def get_arg(name): | |
entry = request.args.get(name) | |
if entry is None: | |
abort(404) | |
return entry | |
@app.route("/inc") | |
def inc(): | |
entry = get_arg("entry") | |
k8s.by_entry(entry).advance_pod() | |
k8s.update_entries() | |
return k8s.print() | |
@app.route("/dec") | |
def dec(): | |
entry = get_arg("entry") | |
k8s.by_entry(entry).advance_pod(-1) | |
k8s.update_entries() | |
return k8s.print() | |
@app.route("/set-action") | |
def set_action(): | |
action = get_arg("action") | |
k8s.exit_action = action | |
return "" | |
@app.route("/action") | |
def exit_action(): | |
return k8s.exit_action | |
@app.route("/job") | |
def decode_job(): | |
entry = get_arg("entry") | |
return k8s.decode_entry(entry) | |
@app.route("/pod") | |
def decode_pod(): | |
entry = get_arg("entry") | |
return k8s.by_entry(entry).selected_pod | |
@app.route("/kill") | |
def kill(): | |
entry = get_arg("entry") | |
job = k8s.by_entry(entry) | |
for pod in job.pods.values(): | |
pod.status = "dying" | |
sp.run( | |
f"kubectl delete job {job.name}", | |
shell=True, | |
stdout=sp.DEVNULL, | |
stderr=sp.DEVNULL, | |
) | |
k8s.update_entries() | |
return k8s.print() | |
@app.route("/info") | |
def info(): | |
entry = get_arg("entry") | |
job = k8s.by_entry(entry) | |
return str(job) | |
@app.route("/refresh") | |
def refresh(): | |
k8s.update() | |
return k8s.print() | |
@app.route("/") | |
def base(): | |
return k8s.print() | |
def run_server(): | |
sys.stderr = open("/tmp/nihao-server-log.out", "w") | |
sys.stdout = open("/tmp/nihao-server-log.out", "a") | |
print(f"Starting server at PID {os.getpid()}") | |
app.run() | |
server = Process(target=run_server) | |
server.start() | |
def build_command(name="", add_entry=True, action=""): | |
if name: | |
name = f"/{name}" | |
entry_substr = " " | |
if add_entry: | |
entry_substr += ' --data-urlencode "entry={}" ' | |
elif action: | |
entry_substr += f' --data-urlencode "action={action}" ' | |
return f"curl --get localhost:5000{name}{entry_substr}--fail 2>/dev/null" | |
GET_CMD = build_command(add_entry=False) | |
def create_binding(keys, action="", add_entry=True, final=False): | |
if final: | |
return [ | |
"--bind", | |
f"{keys}:execute({build_command('set-action', add_entry=False, action=action)})+accept", | |
] | |
else: | |
return [ | |
"--bind", | |
f"{keys}:reload({build_command(action, add_entry=add_entry)} || {GET_CMD})", | |
] | |
FZF_CMD = ["fzf"] | |
if USER: | |
FZF_CMD += ["-q", f".{USER}."] | |
FZF_CMD += ["+s", "-n", "1", "--ansi", "--color"] | |
FZF_CMD += create_binding("ctrl-r", "refresh", add_entry=False) | |
FZF_CMD += create_binding("tab", "inc") | |
FZF_CMD += create_binding("btab", "dec") | |
FZF_CMD += create_binding("ctrl-t", "kill") | |
FZF_CMD += create_binding("ctrl-x", "exec", final=True) | |
FZF_CMD += create_binding("ctrl-s", "describe", final=True) | |
FZF_CMD += create_binding("ctrl-e", "describe-job", final=True) | |
FZF_CMD += create_binding("enter", "logs", final=True) | |
FZF_CMD += create_binding("f1", "help", final=True) | |
FZF_CMD += ["--header", HEADER] | |
fzf = sp.run( | |
FZF_CMD, | |
stdout=sp.PIPE, | |
input=base(), | |
encoding="UTF-8", | |
) | |
if fzf.returncode == 0: | |
action = sp.check_output( | |
build_command("action").format(fzf.stdout), shell=True | |
).decode() | |
job_name = sp.check_output( | |
build_command("job").format(fzf.stdout), shell=True | |
).decode() | |
pod_name = sp.check_output( | |
build_command("pod").format(fzf.stdout), shell=True | |
).decode() | |
server.terminate() | |
try: | |
if not action: | |
pass | |
elif action == "logs": | |
sp.run(["kubectl", "logs", "-f", pod_name]) | |
elif action == "exec": | |
sp.run(["kubectl", "exec", "-it", pod_name, "--", "bash"]) | |
elif action == "describe": | |
sp.run(["bash", "-c", f"kubectl describe pod {pod_name} | less"]) | |
elif action == "describe-job": | |
sp.run(["bash", "-c", f"kubectl describe job {job_name} | less"]) | |
elif action == "help": | |
sp.run(["bash", "-c", f"echo '{HELP}' | less"]) | |
else: | |
pass | |
except KeyboardInterrupt: | |
pass | |
else: | |
server.terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment