Created
June 12, 2026 00:43
-
-
Save langseth/1ee8aa1bbe08cfc096f37eeb1c51d4c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import ctypes | |
| import os | |
| import time | |
| from contextlib import contextmanager | |
| from ctypes import wintypes | |
| @contextmanager | |
| def overwrite(filename, retries=8, retry_delay=0.2): | |
| temp_filename = filename + '.tmp' | |
| with open(temp_filename, 'w') as f: | |
| yield f | |
| last_error = None | |
| for attempt in range(retries + 1): | |
| try: | |
| os.replace(temp_filename, filename) | |
| return | |
| except OSError as exc: | |
| last_error = exc | |
| if os.name != 'nt' or attempt == retries: | |
| break | |
| time.sleep(retry_delay) | |
| if os.name == 'nt': | |
| lockers = _windows_find_locking_processes(filename) | |
| if lockers: | |
| details = ', '.join( | |
| f"pid={proc['pid']} name={proc['name']}" for proc in lockers | |
| ) | |
| raise RuntimeError( | |
| f"Failed to replace '{filename}' after {retries + 1} attempts; " | |
| f"file is still in use by: {details}" | |
| ) from last_error | |
| raise RuntimeError( | |
| f"Failed to replace '{filename}' after {retries + 1} attempts" | |
| ) from last_error | |
| def _windows_find_locking_processes(path): | |
| if os.name != 'nt': | |
| return [] | |
| cch_rm_max_app_name = 255 | |
| cch_rm_max_svc_name = 63 | |
| error_more_data = 234 | |
| class FILETIME(ctypes.Structure): | |
| _fields_ = [ | |
| ('dwLowDateTime', wintypes.DWORD), | |
| ('dwHighDateTime', wintypes.DWORD), | |
| ] | |
| class RM_UNIQUE_PROCESS(ctypes.Structure): | |
| _fields_ = [ | |
| ('dwProcessId', wintypes.DWORD), | |
| ('ProcessStartTime', FILETIME), | |
| ] | |
| class RM_PROCESS_INFO(ctypes.Structure): | |
| _fields_ = [ | |
| ('Process', RM_UNIQUE_PROCESS), | |
| ('strAppName', wintypes.WCHAR * (cch_rm_max_app_name + 1)), | |
| ('strServiceShortName', wintypes.WCHAR * (cch_rm_max_svc_name + 1)), | |
| ('ApplicationType', wintypes.DWORD), | |
| ('AppStatus', wintypes.ULONG), | |
| ('TSSessionId', wintypes.DWORD), | |
| ('bRestartable', wintypes.BOOL), | |
| ] | |
| session = wintypes.DWORD(0) | |
| session_key = ctypes.create_unicode_buffer(64) | |
| rstrtmgr = ctypes.WinDLL('Rstrtmgr') | |
| rm_start_session = rstrtmgr.RmStartSession | |
| rm_start_session.argtypes = [ | |
| ctypes.POINTER(wintypes.DWORD), | |
| wintypes.DWORD, | |
| wintypes.LPWSTR, | |
| ] | |
| rm_start_session.restype = wintypes.DWORD | |
| rm_register_resources = rstrtmgr.RmRegisterResources | |
| rm_register_resources.argtypes = [ | |
| wintypes.DWORD, | |
| wintypes.UINT, | |
| ctypes.POINTER(wintypes.LPCWSTR), | |
| wintypes.UINT, | |
| ctypes.c_void_p, | |
| wintypes.UINT, | |
| ctypes.c_void_p, | |
| ] | |
| rm_register_resources.restype = wintypes.DWORD | |
| rm_get_list = rstrtmgr.RmGetList | |
| rm_get_list.argtypes = [ | |
| wintypes.DWORD, | |
| ctypes.POINTER(wintypes.UINT), | |
| ctypes.POINTER(wintypes.UINT), | |
| ctypes.POINTER(RM_PROCESS_INFO), | |
| ctypes.POINTER(wintypes.DWORD), | |
| ] | |
| rm_get_list.restype = wintypes.DWORD | |
| rm_end_session = rstrtmgr.RmEndSession | |
| rm_end_session.argtypes = [wintypes.DWORD] | |
| rm_end_session.restype = wintypes.DWORD | |
| result = rm_start_session(ctypes.byref(session), 0, session_key) | |
| if result != 0: | |
| return [] | |
| try: | |
| resources = (wintypes.LPCWSTR * 1)(path) | |
| result = rm_register_resources(session.value, 1, resources, 0, None, 0, None) | |
| if result != 0: | |
| return [] | |
| needed = wintypes.UINT(0) | |
| count = wintypes.UINT(0) | |
| reboot_reasons = wintypes.DWORD(0) | |
| result = rm_get_list( | |
| session.value, | |
| ctypes.byref(needed), | |
| ctypes.byref(count), | |
| None, | |
| ctypes.byref(reboot_reasons), | |
| ) | |
| if result == 0 and needed.value == 0: | |
| return [] | |
| if result not in (0, error_more_data): | |
| return [] | |
| proc_array = (RM_PROCESS_INFO * needed.value)() | |
| count = wintypes.UINT(needed.value) | |
| result = rm_get_list( | |
| session.value, | |
| ctypes.byref(needed), | |
| ctypes.byref(count), | |
| proc_array, | |
| ctypes.byref(reboot_reasons), | |
| ) | |
| if result != 0: | |
| return [] | |
| return [ | |
| { | |
| 'pid': int(proc_array[i].Process.dwProcessId), | |
| 'name': str(proc_array[i].strAppName), | |
| } | |
| for i in range(count.value) | |
| ] | |
| finally: | |
| rm_end_session(session.value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment