Created
April 15, 2020 21:14
-
-
Save nockstarr/d1075dfbc1cef40b34efe21edca54c0f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import platform | |
import socket | |
import uuid | |
import re | |
import shutil | |
from io import BufferedReader | |
from typing import Callable, List | |
from pprint import pprint | |
agent_functions = [] | |
def agent(f: Callable): | |
agent_functions.append({"name": f.__name__, "func": f}) | |
return f | |
def bytes_to_mb(_bytes: int) -> int: | |
""" Bytes to Megabytes""" | |
return int(_bytes / float(1 << 20)) | |
def bytes_to_gb(_bytes: int) -> float: | |
""" Bytes to Gigabytes""" | |
# return int(_bytes / float(1 << 30)) | |
return round(_bytes / float(1 << 30), 2) | |
@agent | |
def get_memory_usage() -> dict: | |
""" Get Memory """ | |
# Only for linux/unix https://stackoverflow.com/questions/22102999/get-total-physical-memory-in-python/28161352 | |
mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') | |
return {"memory_usage": { | |
"total_bytes": mem_bytes, | |
"total_mb": bytes_to_mb(mem_bytes), | |
"total_gb": bytes_to_gb(mem_bytes) | |
}} | |
@agent | |
def get_cpu_info() -> dict: | |
# https://stackoverflow.com/a/47203612 | |
cpu_data = (subprocess.check_output("lscpu", shell=True).strip()).decode() | |
splitted_data = [i.split(":") for i in cpu_data.split("\n")] | |
data = {d[0]: d[1].replace(" ", "") for d in splitted_data} | |
if data.get("Flag", False): | |
del data["Flags"] | |
return {"cpu_info": data} | |
@agent | |
def get_hdd_usage() -> dict: | |
""" Get ROOT hdd usage""" | |
# https://stackoverflow.com/a/48929832 | |
total, used, free = shutil.disk_usage("/") | |
return {"hdd_usage": { | |
"total_gb": bytes_to_gb(total), | |
"used_gb": bytes_to_gb(used), | |
"free": bytes_to_gb(free) | |
}} | |
@agent | |
def get_general_sysinfo() -> dict: | |
# https://stackoverflow.com/a/58420504 | |
# https://stackoverflow.com/a/1267524 | |
info = {} | |
try: | |
info['platform'] = platform.system() | |
info['platform-release'] = platform.release() | |
info['platform-version'] = platform.version() | |
info['architecture'] = platform.machine() | |
info['hostname'] = socket.gethostname() | |
# info['ip-address'] = socket.gethostbyname(socket.gethostname()) | |
info['ip-address'] = (([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if | |
not ip.startswith("127.")] or [ | |
[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in | |
[socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0] | |
info['mac-address'] = ':'.join(re.findall('..', '%012x' % uuid.getnode())) | |
info['processor'] = platform.processor() | |
except Exception as e: | |
print(e) | |
return {"general_sysinfo": info} | |
def read_data_stream(stream: BufferedReader): | |
""" Courtesy of my lord and savior Per """ | |
out = "" | |
while True: | |
l = stream.readline().decode() | |
if l == "": | |
break | |
out += l | |
return out | |
def run_cmd(cmd: list) -> str: | |
""" Courtesy of my lord and savior Per """ | |
proc = subprocess.Popen(cmd, | |
shell=False, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
return read_data_stream(proc.stdout) | |
@agent | |
def get_app_info(apps: list, commands: dict = None) -> dict: | |
if not commands: | |
commands = { | |
"nginx": ["nginx" "-v"], | |
"ssh": ["ssh", "-V"], | |
} | |
result = {} | |
for app in apps: | |
cmd = commands.get(app) | |
result[app] = run_cmd(cmd) | |
return {"app_info": result} | |
@agent | |
def get_services(): | |
services = run_cmd(["service", "--status-all"]) | |
result = [] | |
for service in services.split("\n"): | |
if not service: | |
continue | |
service_obj = {"name": "", "status": ""} | |
if "[ + ]" in service: | |
service_obj["name"] = service.replace(" [ + ] ", "") | |
service_obj["status"] = "running" | |
else: | |
service_obj["name"] = service.replace(" [ - ] ", "") | |
service_obj["status"] = "stopped/inactive" | |
result.append(service_obj) | |
return {"services": result} | |
@agent | |
def get_os_info() -> dict: | |
with open("/etc/os-release", "r") as file: | |
data = file.read() | |
return {"os_info": data} | |
print("MEMORY:") | |
pprint(get_memory_usage()) | |
print("CPU:") | |
pprint(get_cpu_info()) | |
print("General sys:") | |
pprint(get_general_sysinfo()) | |
print("HDD:") | |
pprint(get_hdd_usage()) | |
print("APPS:") | |
pprint(get_app_info(["ssh"])) | |
print("SERVICES:") | |
pprint(get_services()) | |
print(get_memory_usage.__name__) | |
print("OS INFO:") | |
print(get_os_info()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment