Created
September 22, 2024 09:18
-
-
Save Summertime/21968bb4d6d89ac5c066adfba3924f28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import ctypes | |
from ctypes import CDLL | |
import sys | |
import random | |
import string | |
from errno import EEXIST | |
# C stuff | |
libc = CDLL('libc.so.6', use_errno=True) | |
AT_EMPTY_PATH = 0x1000 | |
# Python's view of the world | |
FLAGS_W = os.O_CLOEXEC | os.O_CREAT | os.O_TRUNC | os.O_WRONLY | |
mktemp_chars = (string.ascii_letters + string.digits).encode('utf8') | |
class atomic_open: | |
__slots__ = ['_dirfd', '_basename','_fd', '_stream'] | |
def __init__(self, file, mode='r', **kwargs): | |
if not set(mode) <= { 'b', 'w' }: | |
raise ValueError("Mode must be 'w' or 'wb'") | |
dirname, self._basename = os.path.split(file) | |
dirname = dirname or '.' | |
# We use a dir fd so we can avoid issues with CWD changes | |
self._dirfd = os.open(dirname, flags = os.O_DIRECTORY) | |
flags = FLAGS_W | |
flags &= ~ os.O_CREAT | |
flags |= os.O_TMPFILE | |
self._fd = os.open(".", flags, dir_fd=self._dirfd) | |
# Wrap the fd in an io | |
self._stream = os.fdopen(self._fd, mode, **kwargs) | |
def __getattr__(self, key): | |
return getattr(object.__getattribute__(self, '_stream'), key) | |
def __enter__(self): | |
return self | |
def close(self): | |
for _ in range(1000): | |
tempname = b'.' + self._basename.encode('utf8') | |
tempname += b'.' + bytes(random.choices(mktemp_chars, k=10)) | |
retval = libc.linkat(self._fd, '', self._dirfd, tempname, AT_EMPTY_PATH) | |
if retval == 0: | |
break | |
errno = ctypes.get_errno() | |
if errno == EEXIST: | |
continue | |
raise OSError(errno, os.strerror(errno)) | |
else: | |
raise FileExistsError | |
os.fsync(self._fd) | |
self._stream.close() | |
os.rename(tempname, self._basename, src_dir_fd=self._dirfd, dst_dir_fd=self._dirfd) | |
def __exit__(self, exc_type, exc_value, traceback): | |
if exc_type is None: | |
self.close() | |
return | |
from pathlib import Path | |
print('start') | |
print(list(Path().iterdir())) | |
with atomic_open('potato','w') as f: | |
print('writing') | |
print(list(Path().iterdir())) | |
f.write('test\n') | |
print('finished') | |
print(list(Path().iterdir())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment