Created
September 30, 2024 17:16
-
-
Save ysc3839/26044b9a168f27d649fa7e491c889496 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ProcessNameSetter: | |
instance = None | |
@classmethod | |
def register(cls): | |
if cls.instance is not None: | |
return | |
try: | |
cls.instance = cls() | |
import multiprocessing.util | |
multiprocessing.util.register_after_fork(cls.instance, cls.afterfork) | |
except: | |
import traceback | |
traceback.print_exc() | |
def afterfork(self): | |
import procname, multiprocessing | |
procname.set_process_name('pymp:' + multiprocessing.current_process().name) | |
ProcessNameSetter.register() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ctypes, sys | |
def get_orig_argv(): | |
result = getattr(sys, 'orig_argv', None) | |
if result is not None: | |
return result | |
_argv = ctypes.POINTER(ctypes.c_wchar_p)() | |
_argc = ctypes.c_int() | |
ctypes.pythonapi.Py_GetArgcArgv(ctypes.byref(_argc), ctypes.byref(_argv)) | |
argv = _argv[:_argc.value] | |
return argv | |
ARG_MAX = 96 * 1024 | |
c_size_t_p = ctypes.POINTER(ctypes.c_size_t) | |
RTLD_DEFAULT = ctypes.CDLL(None, handle=0) # RTLD_DEFAULT = 0 | |
deref = lambda p: ctypes.c_ubyte.from_address(p).value | |
def get_argv_buf(): | |
argv = get_orig_argv() | |
ptr = c_size_t_p.in_dll(RTLD_DEFAULT, 'environ') # dlsym(RTLD_DEFAULT, "environ") | |
ptr = ptr.contents.value | |
limit = ptr - ARG_MAX | |
ptr -= 1 | |
upper = ptr | |
#buf = [None] * len(argv) | |
for i in range(len(argv) - 1, 0, -1): | |
if deref(ptr) != 0: | |
#print(f'zero {i} not found') | |
return None, None | |
ptr -= 1 | |
while deref(ptr) != 0 and ptr > limit: ptr -= 1 | |
if ptr <= limit: | |
#print(f'failed to found arg {i} start') | |
return None, None | |
#buf[i] = ptr + 1 | |
#print(f'found argv[{i}] at {buf[i]}: {ctypes.string_at(buf[i])}') | |
ptr -= len(argv[0]) | |
argv0 = ctypes.string_at(ptr) | |
#print(f'argv[0] should be at {ptr}: {argv0}') | |
if argv0 != argv[0].encode(): | |
#print(f'argv[0] "{argv0}" doesn\'t match "{argv[0]}"') | |
return None, None | |
#buf[0] = ptr | |
return ptr, upper - ptr | |
argv_ptr = None | |
argv_size = None | |
def set_process_name(name): | |
global argv_ptr, argv_size | |
if not argv_ptr: | |
argv_ptr, argv_size = get_argv_buf() | |
if not argv_ptr: return | |
if isinstance(name, str): | |
name = name.encode() | |
ctypes.memset(argv_ptr, 0, argv_size) | |
ctypes.memmove(argv_ptr, name, min(len(name), argv_size)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment