Skip to content

Instantly share code, notes, and snippets.

@langseth
Created June 12, 2026 00:43
Show Gist options
  • Select an option

  • Save langseth/1ee8aa1bbe08cfc096f37eeb1c51d4c6 to your computer and use it in GitHub Desktop.

Select an option

Save langseth/1ee8aa1bbe08cfc096f37eeb1c51d4c6 to your computer and use it in GitHub Desktop.
import ctypes
import os
import time
from contextlib import contextmanager
from ctypes import wintypes
@contextmanager
def overwrite(filename, retries=8, retry_delay=0.2):
temp_filename = filename + '.tmp'
with open(temp_filename, 'w') as f:
yield f
last_error = None
for attempt in range(retries + 1):
try:
os.replace(temp_filename, filename)
return
except OSError as exc:
last_error = exc
if os.name != 'nt' or attempt == retries:
break
time.sleep(retry_delay)
if os.name == 'nt':
lockers = _windows_find_locking_processes(filename)
if lockers:
details = ', '.join(
f"pid={proc['pid']} name={proc['name']}" for proc in lockers
)
raise RuntimeError(
f"Failed to replace '{filename}' after {retries + 1} attempts; "
f"file is still in use by: {details}"
) from last_error
raise RuntimeError(
f"Failed to replace '{filename}' after {retries + 1} attempts"
) from last_error
def _windows_find_locking_processes(path):
if os.name != 'nt':
return []
cch_rm_max_app_name = 255
cch_rm_max_svc_name = 63
error_more_data = 234
class FILETIME(ctypes.Structure):
_fields_ = [
('dwLowDateTime', wintypes.DWORD),
('dwHighDateTime', wintypes.DWORD),
]
class RM_UNIQUE_PROCESS(ctypes.Structure):
_fields_ = [
('dwProcessId', wintypes.DWORD),
('ProcessStartTime', FILETIME),
]
class RM_PROCESS_INFO(ctypes.Structure):
_fields_ = [
('Process', RM_UNIQUE_PROCESS),
('strAppName', wintypes.WCHAR * (cch_rm_max_app_name + 1)),
('strServiceShortName', wintypes.WCHAR * (cch_rm_max_svc_name + 1)),
('ApplicationType', wintypes.DWORD),
('AppStatus', wintypes.ULONG),
('TSSessionId', wintypes.DWORD),
('bRestartable', wintypes.BOOL),
]
session = wintypes.DWORD(0)
session_key = ctypes.create_unicode_buffer(64)
rstrtmgr = ctypes.WinDLL('Rstrtmgr')
rm_start_session = rstrtmgr.RmStartSession
rm_start_session.argtypes = [
ctypes.POINTER(wintypes.DWORD),
wintypes.DWORD,
wintypes.LPWSTR,
]
rm_start_session.restype = wintypes.DWORD
rm_register_resources = rstrtmgr.RmRegisterResources
rm_register_resources.argtypes = [
wintypes.DWORD,
wintypes.UINT,
ctypes.POINTER(wintypes.LPCWSTR),
wintypes.UINT,
ctypes.c_void_p,
wintypes.UINT,
ctypes.c_void_p,
]
rm_register_resources.restype = wintypes.DWORD
rm_get_list = rstrtmgr.RmGetList
rm_get_list.argtypes = [
wintypes.DWORD,
ctypes.POINTER(wintypes.UINT),
ctypes.POINTER(wintypes.UINT),
ctypes.POINTER(RM_PROCESS_INFO),
ctypes.POINTER(wintypes.DWORD),
]
rm_get_list.restype = wintypes.DWORD
rm_end_session = rstrtmgr.RmEndSession
rm_end_session.argtypes = [wintypes.DWORD]
rm_end_session.restype = wintypes.DWORD
result = rm_start_session(ctypes.byref(session), 0, session_key)
if result != 0:
return []
try:
resources = (wintypes.LPCWSTR * 1)(path)
result = rm_register_resources(session.value, 1, resources, 0, None, 0, None)
if result != 0:
return []
needed = wintypes.UINT(0)
count = wintypes.UINT(0)
reboot_reasons = wintypes.DWORD(0)
result = rm_get_list(
session.value,
ctypes.byref(needed),
ctypes.byref(count),
None,
ctypes.byref(reboot_reasons),
)
if result == 0 and needed.value == 0:
return []
if result not in (0, error_more_data):
return []
proc_array = (RM_PROCESS_INFO * needed.value)()
count = wintypes.UINT(needed.value)
result = rm_get_list(
session.value,
ctypes.byref(needed),
ctypes.byref(count),
proc_array,
ctypes.byref(reboot_reasons),
)
if result != 0:
return []
return [
{
'pid': int(proc_array[i].Process.dwProcessId),
'name': str(proc_array[i].strAppName),
}
for i in range(count.value)
]
finally:
rm_end_session(session.value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment