Skip to content

Instantly share code, notes, and snippets.

@synodriver
Created October 5, 2022 16:21
Show Gist options
  • Save synodriver/de31139d8d712acae1ec4942d6dc832f to your computer and use it in GitHub Desktop.
Save synodriver/de31139d8d712acae1ec4942d6dc832f to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import ctypes
from collections import namedtuple
from ctypes import wintypes
# import win32process
class MODULEENTRY32W(ctypes.Structure):
_fields_ = [
("dwSize", wintypes.DWORD),
("th32ModuleID", wintypes.DWORD),
("th32ProcessID", wintypes.DWORD),
("GlblcntUsage", wintypes.DWORD),
("ProccntUsage", wintypes.DWORD),
("modBaseAddr", wintypes.PBYTE),
("modBaseSize", wintypes.DWORD),
("hModule", wintypes.HMODULE),
("szModule", wintypes.WCHAR * 256),
("szExePath", wintypes.WCHAR * 260),
]
class PROCESSENTRY32W(ctypes.Structure):
_fields_ = [
("dwSize", wintypes.DWORD),
("cntUsage", wintypes.DWORD),
("th32ProcessID", wintypes.DWORD),
("th32DefaultHeapID", ctypes.c_uint64),
("th32ModuleID", wintypes.DWORD),
("cntThreads", wintypes.DWORD),
("th32ParentProcessID", wintypes.DWORD),
("pcPriClassBase", wintypes.LONG),
("dwFlags", wintypes.DWORD),
("szExeFile", wintypes.WCHAR * 260),
]
class THREADENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", wintypes.DWORD),
("cntUsage", wintypes.DWORD),
("th32ThreadID", wintypes.DWORD),
("th32OwnerProcessID", wintypes.DWORD),
("tpBasePri", wintypes.LONG),
("tpDeltaPri", wintypes.LONG),
("dwFlags", wintypes.DWORD),
]
CloseHandle = ctypes.windll.kernel32.CloseHandle
CloseHandle.argtypes = [wintypes.HANDLE]
CloseHandle.restype = ctypes.c_int
OpenProcess = ctypes.windll.kernel32.OpenProcess
OpenProcess.argtypes = [wintypes.DWORD, ctypes.c_int, wintypes.DWORD]
OpenProcess.restype = wintypes.HANDLE
GetPriorityClass = ctypes.windll.kernel32.GetPriorityClass
GetPriorityClass.argtypes = [wintypes.HANDLE]
GetPriorityClass.restype = wintypes.DWORD
CreateToolhelp32Snapshot = ctypes.windll.kernel32.CreateToolhelp32Snapshot
CreateToolhelp32Snapshot.argtypes = [wintypes.DWORD, wintypes.DWORD]
CreateToolhelp32Snapshot.restype = wintypes.HANDLE
Module32FirstW = ctypes.windll.kernel32.Module32FirstW
Module32FirstW.argtypes = [wintypes.HANDLE, ctypes.POINTER(MODULEENTRY32W)]
Module32FirstW.restype = ctypes.c_int
Module32NextW = ctypes.windll.kernel32.Module32NextW
Module32NextW.argtypes = [wintypes.HANDLE, ctypes.POINTER(MODULEENTRY32W)]
Module32NextW.restype = ctypes.c_int
Process32FirstW = ctypes.windll.kernel32.Process32FirstW
Process32FirstW.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32W)]
Process32FirstW.restype = ctypes.c_int
Process32NextW = ctypes.windll.kernel32.Process32NextW
Process32NextW.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32W)]
Process32NextW.restype = ctypes.c_int
Thread32First = ctypes.windll.kernel32.Thread32First
Thread32First.argtypes = [wintypes.HANDLE, ctypes.POINTER(THREADENTRY32)]
Thread32First.restype = ctypes.c_int
Thread32Next = ctypes.windll.kernel32.Thread32Next
Thread32Next.argtypes = [wintypes.HANDLE, ctypes.POINTER(THREADENTRY32)]
Thread32Next.restype = ctypes.c_int
INVALID_HANDLE_VALUE = -1
TH32CS_SNAPMODULE = 0x00000008
TH32CS_SNAPPROCESS = 0x00000002
TH32CS_SNAPTHREAD = 0x00000004
# ---------------copyed from header
DELETE = 0x00010000
READ_CONTROL = 0x00020000
WRITE_DAC = 0x00040000
WRITE_OWNER = 0x00080000
SYNCHRONIZE = 0x00100000
STANDARD_RIGHTS_REQUIRED = 0x000F0000
STANDARD_RIGHTS_READ = READ_CONTROL
STANDARD_RIGHTS_WRITE = READ_CONTROL
STANDARD_RIGHTS_EXECUTE = READ_CONTROL
STANDARD_RIGHTS_ALL = 0x001F0000
SPECIFIC_RIGHTS_ALL = 0x0000FFFF
ACCESS_SYSTEM_SECURITY = 0x01000000
MAXIMUM_ALLOWED = 0x02000000
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
GENERIC_EXECUTE = 0x20000000
GENERIC_ALL = 0x10000000
PROCESS_TERMINATE = 0x0001
PROCESS_CREATE_THREAD = 0x0002
PROCESS_SET_SESSIONID = 0x0004
PROCESS_VM_OPERATION = 0x0008
PROCESS_VM_READ = 0x0010
PROCESS_VM_WRITE = 0x0020
PROCESS_DUP_HANDLE = 0x0040
PROCESS_CREATE_PROCESS = 0x0080
PROCESS_SET_QUOTA = 0x0100
PROCESS_SET_INFORMATION = 0x0200
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_SUSPEND_RESUME = 0x0800
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
PROCESS_SET_LIMITED_INFORMATION = 0x2000
PROCESS_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0xFFFF
# ------------begin
Module = namedtuple(
"Module",
[
"dwSize",
"th32ModuleID",
"th32ProcessID",
"GlblcntUsage",
"ProccntUsage",
"modBaseAddr",
"modBaseSize",
"hModule",
"szModule",
"szExePath",
],
)
Process = namedtuple(
"Process",
[
"dwSize",
"cntUsage",
"th32ProcessID",
"th32DefaultHeapID",
"th32ModuleID",
"cntThreads",
"th32ParentProcessID",
"pcPriClassBase",
"dwFlags",
"szExeFile",
],
)
Thread = namedtuple(
"Thread",
[
"dwSize",
"cntUsage",
"th32ThreadID",
"th32OwnerProcessID",
"tpBasePri",
"tpDeltaPri",
"dwFlags",
],
)
def module_to_tuple(mod):
return Module(
mod.dwSize,
mod.th32ModuleID,
mod.th32ProcessID,
mod.GlblcntUsage,
mod.ProccntUsage,
mod.modBaseAddr,
mod.modBaseSize,
mod.hModule,
mod.szModule,
mod.szExePath,
)
def process_to_tuple(process):
return Process(
process.dwSize,
process.cntUsage,
process.th32ProcessID,
process.th32DefaultHeapID,
process.th32ModuleID,
process.cntThreads,
process.th32ParentProcessID,
process.pcPriClassBase,
process.dwFlags,
process.szExeFile,
)
def thread_to_tuple(thread):
return Thread(
thread.dwSize,
thread.cntUsage,
thread.th32ThreadID,
thread.th32OwnerProcessID,
thread.tpBasePri,
thread.tpDeltaPri,
thread.dwFlags,
)
class ModuleIter:
# cdef:
# HANDLE snap_shot
# MODULEENTRY32W mod
# bint entered
@staticmethod
def from_ptr(snap_shot):
self = ModuleIter.__new__(ModuleIter)
self.snap_shot = snap_shot
self.mod = MODULEENTRY32W()
self.mod.dwSize = ctypes.sizeof(MODULEENTRY32W)
self.entered = False
return self
def __del__(self):
CloseHandle(self.snap_shot)
def __iter__(self):
return self
def __next__(self):
if not self.entered:
more = Module32FirstW(self.snap_shot, ctypes.byref(self.mod))
if not more:
raise StopIteration
self.entered = True
return module_to_tuple(self.mod)
else:
more = Module32NextW(self.snap_shot, ctypes.byref(self.mod))
if not more:
raise StopIteration
return module_to_tuple(self.mod)
class ProcessIter:
@staticmethod
def from_ptr(snap_shot):
self = ProcessIter.__new__(ProcessIter)
self.snap_shot = snap_shot
self.process = PROCESSENTRY32W()
self.process.dwSize = ctypes.sizeof(PROCESSENTRY32W)
self.entered = False
return self
def __del__(self):
CloseHandle(self.snap_shot)
def __iter__(self):
return self
def __next__(self):
if not self.entered:
more = Process32FirstW(self.snap_shot, ctypes.byref(self.process))
if not more:
raise StopIteration
self.entered = True
return process_to_tuple(self.process)
else:
more = Process32NextW(self.snap_shot, ctypes.byref(self.process))
if not more:
raise StopIteration
return process_to_tuple(self.process)
class ThreadIter:
# cdef:
# HANDLE snap_shot
# THREADENTRY32 thread
# bint entered
@staticmethod
def from_ptr(snap_shot):
self = ThreadIter.__new__(ThreadIter)
self.snap_shot = snap_shot
self.thread = THREADENTRY32()
self.thread.dwSize = ctypes.sizeof(THREADENTRY32)
self.entered = False
return self
def __del__(self):
CloseHandle(self.snap_shot)
def __iter__(self):
return self
def __next__(self):
if not self.entered:
more = Thread32First(self.snap_shot, ctypes.byref(self.thread))
if not more:
raise StopIteration
self.entered = True
return thread_to_tuple(self.thread)
else:
more = Thread32Next(self.snap_shot, ctypes.byref(self.thread))
if not more:
raise StopIteration
return thread_to_tuple(self.thread)
def iter_module(pid):
snap_shot = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, pid)
if snap_shot == INVALID_HANDLE_VALUE:
raise ValueError("failed to call CreateToolhelp32Snapshot")
return ModuleIter.from_ptr(snap_shot)
def iter_process(pid):
"""
:param pid: 0 means all
:return:
"""
snap_shot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, pid)
if snap_shot == INVALID_HANDLE_VALUE:
raise ValueError("failed to call CreateToolhelp32Snapshot")
return ProcessIter.from_ptr(snap_shot)
def iter_thread(pid):
snap_shot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, pid)
if snap_shot == INVALID_HANDLE_VALUE:
raise ValueError("failed to call CreateToolhelp32Snapshot")
return ThreadIter.from_ptr(snap_shot)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment