Created
October 5, 2022 16:21
-
-
Save synodriver/de31139d8d712acae1ec4942d6dc832f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import ctypes | |
from collections import namedtuple | |
from ctypes import wintypes | |
# import win32process | |
class MODULEENTRY32W(ctypes.Structure): | |
_fields_ = [ | |
("dwSize", wintypes.DWORD), | |
("th32ModuleID", wintypes.DWORD), | |
("th32ProcessID", wintypes.DWORD), | |
("GlblcntUsage", wintypes.DWORD), | |
("ProccntUsage", wintypes.DWORD), | |
("modBaseAddr", wintypes.PBYTE), | |
("modBaseSize", wintypes.DWORD), | |
("hModule", wintypes.HMODULE), | |
("szModule", wintypes.WCHAR * 256), | |
("szExePath", wintypes.WCHAR * 260), | |
] | |
class PROCESSENTRY32W(ctypes.Structure): | |
_fields_ = [ | |
("dwSize", wintypes.DWORD), | |
("cntUsage", wintypes.DWORD), | |
("th32ProcessID", wintypes.DWORD), | |
("th32DefaultHeapID", ctypes.c_uint64), | |
("th32ModuleID", wintypes.DWORD), | |
("cntThreads", wintypes.DWORD), | |
("th32ParentProcessID", wintypes.DWORD), | |
("pcPriClassBase", wintypes.LONG), | |
("dwFlags", wintypes.DWORD), | |
("szExeFile", wintypes.WCHAR * 260), | |
] | |
class THREADENTRY32(ctypes.Structure): | |
_fields_ = [ | |
("dwSize", wintypes.DWORD), | |
("cntUsage", wintypes.DWORD), | |
("th32ThreadID", wintypes.DWORD), | |
("th32OwnerProcessID", wintypes.DWORD), | |
("tpBasePri", wintypes.LONG), | |
("tpDeltaPri", wintypes.LONG), | |
("dwFlags", wintypes.DWORD), | |
] | |
CloseHandle = ctypes.windll.kernel32.CloseHandle | |
CloseHandle.argtypes = [wintypes.HANDLE] | |
CloseHandle.restype = ctypes.c_int | |
OpenProcess = ctypes.windll.kernel32.OpenProcess | |
OpenProcess.argtypes = [wintypes.DWORD, ctypes.c_int, wintypes.DWORD] | |
OpenProcess.restype = wintypes.HANDLE | |
GetPriorityClass = ctypes.windll.kernel32.GetPriorityClass | |
GetPriorityClass.argtypes = [wintypes.HANDLE] | |
GetPriorityClass.restype = wintypes.DWORD | |
CreateToolhelp32Snapshot = ctypes.windll.kernel32.CreateToolhelp32Snapshot | |
CreateToolhelp32Snapshot.argtypes = [wintypes.DWORD, wintypes.DWORD] | |
CreateToolhelp32Snapshot.restype = wintypes.HANDLE | |
Module32FirstW = ctypes.windll.kernel32.Module32FirstW | |
Module32FirstW.argtypes = [wintypes.HANDLE, ctypes.POINTER(MODULEENTRY32W)] | |
Module32FirstW.restype = ctypes.c_int | |
Module32NextW = ctypes.windll.kernel32.Module32NextW | |
Module32NextW.argtypes = [wintypes.HANDLE, ctypes.POINTER(MODULEENTRY32W)] | |
Module32NextW.restype = ctypes.c_int | |
Process32FirstW = ctypes.windll.kernel32.Process32FirstW | |
Process32FirstW.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32W)] | |
Process32FirstW.restype = ctypes.c_int | |
Process32NextW = ctypes.windll.kernel32.Process32NextW | |
Process32NextW.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32W)] | |
Process32NextW.restype = ctypes.c_int | |
Thread32First = ctypes.windll.kernel32.Thread32First | |
Thread32First.argtypes = [wintypes.HANDLE, ctypes.POINTER(THREADENTRY32)] | |
Thread32First.restype = ctypes.c_int | |
Thread32Next = ctypes.windll.kernel32.Thread32Next | |
Thread32Next.argtypes = [wintypes.HANDLE, ctypes.POINTER(THREADENTRY32)] | |
Thread32Next.restype = ctypes.c_int | |
INVALID_HANDLE_VALUE = -1 | |
TH32CS_SNAPMODULE = 0x00000008 | |
TH32CS_SNAPPROCESS = 0x00000002 | |
TH32CS_SNAPTHREAD = 0x00000004 | |
# ---------------copyed from header | |
DELETE = 0x00010000 | |
READ_CONTROL = 0x00020000 | |
WRITE_DAC = 0x00040000 | |
WRITE_OWNER = 0x00080000 | |
SYNCHRONIZE = 0x00100000 | |
STANDARD_RIGHTS_REQUIRED = 0x000F0000 | |
STANDARD_RIGHTS_READ = READ_CONTROL | |
STANDARD_RIGHTS_WRITE = READ_CONTROL | |
STANDARD_RIGHTS_EXECUTE = READ_CONTROL | |
STANDARD_RIGHTS_ALL = 0x001F0000 | |
SPECIFIC_RIGHTS_ALL = 0x0000FFFF | |
ACCESS_SYSTEM_SECURITY = 0x01000000 | |
MAXIMUM_ALLOWED = 0x02000000 | |
GENERIC_READ = 0x80000000 | |
GENERIC_WRITE = 0x40000000 | |
GENERIC_EXECUTE = 0x20000000 | |
GENERIC_ALL = 0x10000000 | |
PROCESS_TERMINATE = 0x0001 | |
PROCESS_CREATE_THREAD = 0x0002 | |
PROCESS_SET_SESSIONID = 0x0004 | |
PROCESS_VM_OPERATION = 0x0008 | |
PROCESS_VM_READ = 0x0010 | |
PROCESS_VM_WRITE = 0x0020 | |
PROCESS_DUP_HANDLE = 0x0040 | |
PROCESS_CREATE_PROCESS = 0x0080 | |
PROCESS_SET_QUOTA = 0x0100 | |
PROCESS_SET_INFORMATION = 0x0200 | |
PROCESS_QUERY_INFORMATION = 0x0400 | |
PROCESS_SUSPEND_RESUME = 0x0800 | |
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 | |
PROCESS_SET_LIMITED_INFORMATION = 0x2000 | |
PROCESS_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0xFFFF | |
# ------------begin | |
Module = namedtuple( | |
"Module", | |
[ | |
"dwSize", | |
"th32ModuleID", | |
"th32ProcessID", | |
"GlblcntUsage", | |
"ProccntUsage", | |
"modBaseAddr", | |
"modBaseSize", | |
"hModule", | |
"szModule", | |
"szExePath", | |
], | |
) | |
Process = namedtuple( | |
"Process", | |
[ | |
"dwSize", | |
"cntUsage", | |
"th32ProcessID", | |
"th32DefaultHeapID", | |
"th32ModuleID", | |
"cntThreads", | |
"th32ParentProcessID", | |
"pcPriClassBase", | |
"dwFlags", | |
"szExeFile", | |
], | |
) | |
Thread = namedtuple( | |
"Thread", | |
[ | |
"dwSize", | |
"cntUsage", | |
"th32ThreadID", | |
"th32OwnerProcessID", | |
"tpBasePri", | |
"tpDeltaPri", | |
"dwFlags", | |
], | |
) | |
def module_to_tuple(mod): | |
return Module( | |
mod.dwSize, | |
mod.th32ModuleID, | |
mod.th32ProcessID, | |
mod.GlblcntUsage, | |
mod.ProccntUsage, | |
mod.modBaseAddr, | |
mod.modBaseSize, | |
mod.hModule, | |
mod.szModule, | |
mod.szExePath, | |
) | |
def process_to_tuple(process): | |
return Process( | |
process.dwSize, | |
process.cntUsage, | |
process.th32ProcessID, | |
process.th32DefaultHeapID, | |
process.th32ModuleID, | |
process.cntThreads, | |
process.th32ParentProcessID, | |
process.pcPriClassBase, | |
process.dwFlags, | |
process.szExeFile, | |
) | |
def thread_to_tuple(thread): | |
return Thread( | |
thread.dwSize, | |
thread.cntUsage, | |
thread.th32ThreadID, | |
thread.th32OwnerProcessID, | |
thread.tpBasePri, | |
thread.tpDeltaPri, | |
thread.dwFlags, | |
) | |
class ModuleIter: | |
# cdef: | |
# HANDLE snap_shot | |
# MODULEENTRY32W mod | |
# bint entered | |
@staticmethod | |
def from_ptr(snap_shot): | |
self = ModuleIter.__new__(ModuleIter) | |
self.snap_shot = snap_shot | |
self.mod = MODULEENTRY32W() | |
self.mod.dwSize = ctypes.sizeof(MODULEENTRY32W) | |
self.entered = False | |
return self | |
def __del__(self): | |
CloseHandle(self.snap_shot) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if not self.entered: | |
more = Module32FirstW(self.snap_shot, ctypes.byref(self.mod)) | |
if not more: | |
raise StopIteration | |
self.entered = True | |
return module_to_tuple(self.mod) | |
else: | |
more = Module32NextW(self.snap_shot, ctypes.byref(self.mod)) | |
if not more: | |
raise StopIteration | |
return module_to_tuple(self.mod) | |
class ProcessIter: | |
@staticmethod | |
def from_ptr(snap_shot): | |
self = ProcessIter.__new__(ProcessIter) | |
self.snap_shot = snap_shot | |
self.process = PROCESSENTRY32W() | |
self.process.dwSize = ctypes.sizeof(PROCESSENTRY32W) | |
self.entered = False | |
return self | |
def __del__(self): | |
CloseHandle(self.snap_shot) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if not self.entered: | |
more = Process32FirstW(self.snap_shot, ctypes.byref(self.process)) | |
if not more: | |
raise StopIteration | |
self.entered = True | |
return process_to_tuple(self.process) | |
else: | |
more = Process32NextW(self.snap_shot, ctypes.byref(self.process)) | |
if not more: | |
raise StopIteration | |
return process_to_tuple(self.process) | |
class ThreadIter: | |
# cdef: | |
# HANDLE snap_shot | |
# THREADENTRY32 thread | |
# bint entered | |
@staticmethod | |
def from_ptr(snap_shot): | |
self = ThreadIter.__new__(ThreadIter) | |
self.snap_shot = snap_shot | |
self.thread = THREADENTRY32() | |
self.thread.dwSize = ctypes.sizeof(THREADENTRY32) | |
self.entered = False | |
return self | |
def __del__(self): | |
CloseHandle(self.snap_shot) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if not self.entered: | |
more = Thread32First(self.snap_shot, ctypes.byref(self.thread)) | |
if not more: | |
raise StopIteration | |
self.entered = True | |
return thread_to_tuple(self.thread) | |
else: | |
more = Thread32Next(self.snap_shot, ctypes.byref(self.thread)) | |
if not more: | |
raise StopIteration | |
return thread_to_tuple(self.thread) | |
def iter_module(pid): | |
snap_shot = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, pid) | |
if snap_shot == INVALID_HANDLE_VALUE: | |
raise ValueError("failed to call CreateToolhelp32Snapshot") | |
return ModuleIter.from_ptr(snap_shot) | |
def iter_process(pid): | |
""" | |
:param pid: 0 means all | |
:return: | |
""" | |
snap_shot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, pid) | |
if snap_shot == INVALID_HANDLE_VALUE: | |
raise ValueError("failed to call CreateToolhelp32Snapshot") | |
return ProcessIter.from_ptr(snap_shot) | |
def iter_thread(pid): | |
snap_shot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, pid) | |
if snap_shot == INVALID_HANDLE_VALUE: | |
raise ValueError("failed to call CreateToolhelp32Snapshot") | |
return ThreadIter.from_ptr(snap_shot) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment