Created
January 6, 2024 21:55
-
-
Save learnitall/2c86da0a05bb2810bd8b013f0b60bdcc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
status.py: Display status of resources in the terminal. | |
Requires rich and mctools. Developed on Python 3.10. | |
""" | |
from datetime import datetime | |
import time | |
from string import digits, printable | |
from mctools import PINGClient, RCONClient | |
import psutil | |
from pystemd.systemd1 import Unit | |
from rich.align import Align | |
from rich.console import Group | |
from rich.layout import Layout | |
from rich.live import Live | |
from rich.panel import Panel | |
from rich.progress import Progress, TextColumn, BarColumn | |
from rich.table import Table | |
from rich.text import Text | |
RENDER_INTERVAL_SECONDS = 1 | |
MINECRAFT_HOST = "localhost" | |
MOON_PHASES = ( | |
"🌕", "🌖", "🌗", "🌘", "🌑", "🌒", "🌓", "🌔", | |
) | |
HEADER_ART = """+------+. | |
|`. | `. | |
| `+--+---+ | |
| | | | MCPaper | |
+---+--+. | Status | |
`. | `.| Display | |
`+------+""" | |
def percentage_to_color(percent: float) -> str: | |
""" | |
Return a color appropriate to the given percentage: | |
* percentage <= (1/3): green | |
* (1/3) <= percentage <= (2/3): yellow | |
* percentage > (2/3): red | |
""" | |
if percent <= 0.33: | |
return "green" | |
elif percent <= 0.66: | |
return "yellow" | |
else: | |
return "red" | |
def format_bytes(num, suffix="B") -> str: | |
""" | |
Format numeric bytes into a human-readable representation. | |
From: https://stackoverflow.com/questions/1094841/get-human-readable-version-of-file-size | |
""" | |
for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): | |
if abs(num) < 1024.0: | |
return f"{num:3.1f}{unit}{suffix}" | |
num /= 1024.0 | |
return f"{num:.1f}Yi{suffix}" | |
def only_digits(target: str) -> str: | |
return "".join((c for c in target if c in digits)) | |
def only_ascii(target: str) -> str: | |
return "".join((c for c in target if c in printable)) | |
def new_basic_progress_bar( | |
background_color: str = "bar.back", | |
progress_color: str = "bar.complete", | |
) -> Progress: | |
return Progress( | |
TextColumn( | |
"[bold][progress.description]{task.description}: [/bold]" \ | |
"{task.completed:.2f}%", | |
), | |
BarColumn( | |
style=background_color, | |
complete_style=progress_color, | |
), | |
auto_refresh=False, | |
expand=True, | |
) | |
def render_header() -> Group: | |
""" | |
render_header returns a renderable of the McPaper header. | |
""" | |
return Group( | |
Text(HEADER_ART, justify="left", style="green bold"), | |
Text(datetime.now().ctime(), justify="center"), | |
) | |
def render_cpu_usage() -> Progress: | |
""" | |
render_cpu_usage returns a renderable of average CPU usage. | |
Assumes that at least 0.1 seconds have elapsed between calls. | |
""" | |
avg_time = psutil.cpu_percent() | |
progress = new_basic_progress_bar( | |
progress_color=percentage_to_color(avg_time / 100), | |
) | |
t1 = progress.add_task("Average CPU Time") | |
progress.update(t1, advance=(avg_time)) | |
return progress | |
def render_mem_usage() -> Progress: | |
mem = psutil.virtual_memory() | |
free = (mem.available / mem.total) | |
progress = new_basic_progress_bar( | |
progress_color=percentage_to_color(1-free), | |
) | |
t1 = progress.add_task("Memory Free") | |
progress.update(t1, advance=free*100) | |
return progress | |
class NetworkUsageHandler: | |
""" | |
NetworkUsage is used to render network usage of each interface. | |
""" | |
OMIT = ("lo", "virbr0", "docker0") | |
def __init__(self): | |
self.prev_counters = None | |
self.prev_timestamp = None | |
@staticmethod | |
def bps_to_str(raw: float) -> str: | |
return f"{format_bytes(raw)} /s" | |
def render(self) -> Table: | |
t = Table(box=False, expand=True, pad_edge=False) | |
t.add_column("Netdev", justify="left", style="cyan") | |
for prefix in ("Rx", "Tx"): | |
t.add_column( | |
f"{prefix} Bytes", justify="right", style="green", | |
) | |
t.add_column( | |
f"{prefix} Drops", justify="right", style="red", | |
) | |
timestamp = datetime.now() | |
counters = psutil.net_io_counters(pernic=True) | |
if counters is None or counters == dict(): | |
raise ValueError( | |
"Unable to grab inet counters, got empty result" | |
) | |
for iname in self.OMIT: | |
if counters.get(iname, None) is not None: | |
del(counters[iname]) | |
if self.prev_counters is None or self.prev_timestamp is None: | |
self.prev_counters, self.prev_timestamp = counters, timestamp | |
return t | |
known_inames = self.prev_counters.keys() | |
delta_time = (timestamp - self.prev_timestamp).seconds | |
for iname in known_inames & counters.keys(): | |
counts = counters[iname] | |
prev_counts = self.prev_counters[iname] | |
delta_rx_bytes = counts.bytes_recv - prev_counts.bytes_recv | |
rx_speed = delta_rx_bytes / delta_time | |
delta_tx_bytes = counts.bytes_sent - prev_counts.bytes_sent | |
tx_speed = delta_tx_bytes / delta_time | |
t.add_row( | |
iname, | |
self.bps_to_str(rx_speed), str(counts.dropout), | |
self.bps_to_str(tx_speed), str(counts.dropin), | |
) | |
self.prev_counters = counters | |
self.prev_timestamp = timestamp | |
return t | |
def render_cpu_temperature() -> Text: | |
""" | |
Render CPU temperature in celsius. | |
Assumes the system only has a single CPU. | |
""" | |
temps = psutil.sensors_temperatures() | |
if temps == dict(): | |
return Text( | |
"OS does not support reading hardware temp", style="red", | |
) | |
if temps.get("coretemp") is None: | |
raise KeyError("Unable to find cpu temperature (coretemp)") | |
cputemp = None | |
for temp in temps["coretemp"]: | |
# Some systems have 'Physical id 0', others have | |
# 'Package id 0', so just check for something that ends | |
# with 'id 0'. Individual cores are formatted as "Core x". | |
if temp.label.endswith("id 0"): | |
cputemp = temp | |
break | |
if cputemp is None: | |
print(temps["coretemp"]) | |
raise KeyError( | |
"Unable to find temperature for CPU (Physical id 0)" | |
) | |
t = Text(f"CPU Temperature: ") | |
t.stylize("bold") | |
temp = Text(f"{cputemp.current}°C") | |
high = cputemp.high if cputemp.high is not None else 100.0 | |
if high - 10 <= cputemp.current: | |
color = "red" | |
elif high - 20 <= cputemp.current: | |
color = "yellow" | |
else: | |
color = "green" | |
temp.stylize(color) | |
t.append(temp) | |
return t | |
def render_systemd_unit(name: str) -> Text: | |
""" | |
Renders status information for a systemd unit by the given name. | |
""" | |
unit = Unit(name.encode("utf-8"), _autoload=True) | |
pid = unit.Service.MainPID | |
state = unit.Unit.ActiveState.decode("utf-8") | |
t = Text(f"{str(name)}: ") | |
t.stylize("bold") | |
serviceInfo = Text(f"{state.upper()} (PID {pid})") | |
serviceInfo.stylize( | |
{ | |
"active": "green", | |
"failed": "red", | |
}.get(state, "yellow") | |
) | |
t.append(serviceInfo) | |
return t | |
def render_mc_server_info( | |
host: str, ping_port: int, rcon_port: int, password: str, | |
) -> Text: | |
""" | |
Render information about a Minecraft server. | |
Information is pulled using the Server List Ping interface and the | |
RCON interface. | |
See https://wiki.vg/Server_List_Ping and https://wiki.vg/RCON | |
for more information. | |
""" | |
t = Text(f"MC Server {host}: ") | |
t.stylize("bold") | |
pclient = PINGClient(host, port=ping_port) | |
try: | |
stats = pclient.get_stats(return_packet=False) | |
if not isinstance(stats, dict): | |
raise TypeError( | |
f"Unknown response from mc server {host}, expected dict: " \ | |
f"{stats}", | |
) | |
except Exception as err: | |
t.append(f"{str(err)}") | |
t.stylize("red") | |
return t | |
else: | |
pclient.stop() | |
motd = stats.get("description") | |
# Remove non-ascii characters | |
if motd is not None: | |
t.append(f"{only_ascii(motd).strip('[0m')}") | |
num_online = stats.get("players", {}).get("online") | |
if num_online is not None: | |
t.append(Text(f"\nOnline Players: ", style="bold")) | |
t.append(f"{str(num_online)}") | |
latency = stats.get("time") | |
if latency is not None: | |
t.append(Text(f"\nPing: ", style="bold")) | |
t.append(f"{latency*100:.2f}ms") | |
version = stats.get("version", {}).get("name") | |
if version is not None: | |
t.append(Text(f"\nVersion: ", style="bold")) | |
t.append(f"{version}") | |
rcon = RCONClient(host, port=rcon_port) | |
success = rcon.login(password) | |
if not success: | |
t.append_text( | |
Text( | |
"\nWARN: Unable to authenticate to RCON", | |
style="yellow", | |
) | |
) | |
time_ticks = rcon.command("/time query daytime") | |
if not isinstance(time_ticks, str): | |
t.append_text(Text( | |
"\nUnknown response from rcon for '/time query daytime'" \ | |
f": {time_ticks}", style="yellow", | |
)) | |
elif len(time_ticks) > 0: | |
time_secs = int(only_digits(time_ticks)) / 20.0 | |
time_str = time.strftime("%H:%M:%S", time.gmtime(time_secs)) | |
t.append(Text(f"\nTime: ", style="bold")) | |
t.append(f"{time_str}") | |
n_days = rcon.command("/time query day") | |
if not isinstance(n_days, str): | |
t.append_text(Text( | |
"\nUnknown response from rcon for '/time query day'" \ | |
f": {n_days}", style="yellow", | |
)) | |
elif len(n_days) > 0: | |
phase = MOON_PHASES[int(only_digits(n_days)) % 8] | |
t.append(Text(f"\nMoon Phase: ", style="bold")) | |
t.append(f"{phase}") | |
rcon.stop() | |
return t | |
def render_all(network_usage_handler: NetworkUsageHandler) -> Align: | |
layout = Layout(name="root") | |
layout.split_row( | |
Layout(name="left"), | |
Layout(name="middle"), | |
Layout(name="right"), | |
) | |
layout["left"].split_row( | |
Layout(name="left-left"), | |
Layout(name="left-right"), | |
) | |
layout["left-left"].update( | |
Align.center(render_header(), vertical="bottom") | |
) | |
layout["left-right"].update( | |
Align.center( | |
Panel( | |
Group( | |
render_mc_server_info( | |
MINECRAFT_HOST, 13337, 25575, | |
"time-for-holiday-break-and-to-play-minecraft-8", | |
), | |
), | |
title="Minecraft Server Info", | |
), | |
vertical="bottom", | |
) | |
) | |
layout["middle"].update( | |
Align.center( | |
Panel( | |
Align.center( | |
Group( | |
render_systemd_unit("grafana.service"), | |
render_systemd_unit("prometheus.service"), | |
render_systemd_unit("minecraft-exporter.service"), | |
render_systemd_unit("prometheus-node-exporter.service"), | |
render_systemd_unit("minecraft-server.service"), | |
render_systemd_unit("caddy.service"), | |
), | |
vertical="bottom" | |
), | |
title="Systemd Units", | |
), | |
vertical="bottom" | |
) | |
) | |
layout["right"].update( | |
Align.center( | |
Panel( | |
Group( | |
render_cpu_usage(), | |
render_mem_usage(), | |
network_usage_handler.render(), | |
), | |
title="Node Info", | |
), | |
vertical="bottom", | |
) | |
) | |
return Align.center(layout, vertical="bottom") | |
def main(): | |
network_usage_handler = NetworkUsageHandler() | |
do_render = lambda: render_all(network_usage_handler) | |
with Live(do_render()) as live: | |
while True: | |
try: | |
time.sleep(RENDER_INTERVAL_SECONDS) | |
live.update(do_render()) | |
except KeyboardInterrupt: | |
return | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment