Skip to content

Instantly share code, notes, and snippets.

@aont
Last active October 28, 2024 09:01
Show Gist options
  • Save aont/6436962b97565800450a98ce76c69bd9 to your computer and use it in GitHub Desktop.
Save aont/6436962b97565800450a98ce76c69bd9 to your computer and use it in GitHub Desktop.
#!./venv/bin/python
import sys
import os
import http.server
import subprocess
# import json
import socket
disk_path_list = ["/dev/sdb", "/dev/sdc"]
assert all(os.path.exists(disk_path) for disk_path in disk_path_list)
socket_path = "/tmp/standby_disks.sock"
def get_drivestate(device_path_list):
proc_dict: dict[str, subprocess.Popen] = {}
for device_path in device_path_list:
proc = subprocess.Popen(("hdparm", "-C", device_path), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc_dict[device_path] = proc
for device_path, proc in proc_dict.items():
hdparm_output = proc.stdout.read()
hdparm_stderr = proc.stderr.read()
ret = proc.wait()
if len(hdparm_stderr)>0:
yield {"device_path": device_path, "status": "error", "stdout": hdparm_output, "stderr": hdparm_stderr}
elif b' drive state is: active/idle' in hdparm_output:
yield {"device_path": device_path, "status": 'active', "stdout": hdparm_output, "stderr": hdparm_stderr}
elif b' drive state is: standby' in hdparm_output:
yield {"device_path": device_path, "status": 'standby', "stdout": hdparm_output, "stderr": hdparm_stderr}
def standby_disk(device_path_list):
proc_dict: dict[str, subprocess.Popen] = {}
for device_path in device_path_list:
proc = subprocess.Popen(("hdparm", "-y", device_path), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc_dict[device_path] = proc
for device_path, proc in proc_dict.items():
hdparm_output = proc.stdout.read()
hdparm_stderr = proc.stderr.read()
ret = proc.wait()
if len(hdparm_stderr)>0:
yield {"device_path": device_path, "status": "error", "stderr": hdparm_stderr, "stdout": hdparm_output}
else:
yield {"device_path": device_path, "status": 'success', "stderr": hdparm_stderr, "stdout": hdparm_output}
class SimpleHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
sys.stderr.write(f"[info] request\n")
self.send_response_only(200)
self.send_header("Content-type", "text/html; charset=utf-8")
self.end_headers()
drivestate_result_list: tuple[dict[str, subprocess.Popen]] = tuple(get_drivestate(disk_path_list))
self.wfile.write(b"<h2>Drive State</h2>\n")
self.wfile.write(b"<table><thead></thead><tbody>\n")
disk_path_list_active: list[str] = []
for drivestate_result in drivestate_result_list:
self.wfile.write(b"<tr>\n")
self.wfile.write(b"<th>" + os.path.basename(drivestate_result['device_path']).encode("utf-8") + b"</th>\n")
self.wfile.write(b"<td>\n")
# os.path.basename
if drivestate_result["status"] == "error":
self.wfile.write(b"<div style=\"color: red;\"><b>error</b>\n")
self.wfile.write(b"stdout: " + drivestate_result["stdout"].replace(b"\n", b"<br>\n") + b"<br>\n")
self.wfile.write(b"stderr: " + drivestate_result["stderr"].replace(b"\n", b"<br>\n") + b"<br>\n")
self.wfile.write(b"</div>\n")
continue
if drivestate_result["status"] in ("active", "standby"):
self.wfile.write(b"<div>")
self.wfile.write(drivestate_result['status'].encode("utf-8") + b"<br>\n")
# self.wfile.write(b"stdout: " + drivestate_result["stdout"] + b"<br>\n")
# self.wfile.write(b"stderr: " + drivestate_result["stderr"] + b"<br>\n")
self.wfile.write(b"</div>\n")
self.wfile.write(b"</td>\n")
self.wfile.write(b"</tr>\n")
if drivestate_result["status"] == 'active':
disk_path_list_active.append(drivestate_result["device_path"])
self.wfile.write(b"</tbody></table>\n")
standby_disk_result_list = tuple(standby_disk(disk_path_list_active))
self.wfile.write(b"<h2>Standby Result</h2>\n")
self.wfile.write(b"<table><thead></thead><tbody>\n")
for standby_disk_result in standby_disk_result_list:
self.wfile.write(b"<tr>\n")
self.wfile.write(b"<th>" + os.path.basename(drivestate_result['device_path']).encode("utf-8") + b"</th>\n")
self.wfile.write(b"<td>\n")
if standby_disk_result["status"] == "error":
self.wfile.write(b"<div style=\"color: red;\"><b>error</b>")
self.wfile.write(b"stdout: " + standby_disk_result["stdout"].replace(b"\n", b"<br>\n") + b"<br>\n")
self.wfile.write(b"stderr: " + standby_disk_result["stderr"].replace(b"\n", b"<br>\n") + b"<br>\n")
self.wfile.write(b"</div>")
elif standby_disk_result["status"] == "success":
self.wfile.write(b"<div>")
self.wfile.write(b"stdout: " + standby_disk_result["stdout"].replace(b"\n", b"<br>\n") + b"<br>\n")
self.wfile.write(b"stderr: " + standby_disk_result["stderr"].replace(b"\n", b"<br>\n") + b"<br>\n")
self.wfile.write(b"</div>")
self.wfile.write(b"</td>\n")
self.wfile.write(b"</tr>\n")
self.wfile.write(b"</tbody></table>\n")
# proc_list: list[subprocess.Popen] = []
# for disk_path in disk_path_list:
# proc = subprocess.Popen(("hdparm", "-y", disk_path), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# proc_list.append(proc)
# for proc in proc_list:
# out = proc.stdout.read()
# err = proc.stderr.read()
# ret = proc.wait()
# if ret == 0: self.wfile.write(b"<div>")
# if ret != 0: self.wfile.write(b"<div style=\"color: red;\">")
# self.wfile.write(f"args: {json.dumps(proc.args)}<br>\n".encode("utf-8"))
# self.wfile.write(b"stdout: " + out + b"<br>\n")
# self.wfile.write(b"stderr: " + err + b"<br>\n")
# self.wfile.write(f"{ret=}<br>\n".encode("utf-8"))
# self.wfile.write(b"</div>")
if __name__ == "__main__":
# port_number = 8000
# server_address = ("", port_number)
if os.path.exists(socket_path):
os.remove(socket_path)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.bind(socket_path)
sock.listen(5)
os.chmod(socket_path, 0o777)
with http.server.HTTPServer(sock.getsockname(), SimpleHandler, bind_and_activate=False) as httpd:
httpd.socket = sock
sys.stderr.write(f"[info] Server running on UNIX domain socket: {socket_path}\n")
httpd.serve_forever()
# append before location / { ... }
location /standby_disks/ {
proxy_pass http://unix:/tmp/standby_disks.sock;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment