Skip to content

Instantly share code, notes, and snippets.

@Summertime
Created September 22, 2024 09:18
Show Gist options
  • Save Summertime/21968bb4d6d89ac5c066adfba3924f28 to your computer and use it in GitHub Desktop.
Save Summertime/21968bb4d6d89ac5c066adfba3924f28 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os
import ctypes
from ctypes import CDLL
import sys
import random
import string
from errno import EEXIST
# C stuff
libc = CDLL('libc.so.6', use_errno=True)
AT_EMPTY_PATH = 0x1000
# Python's view of the world
FLAGS_W = os.O_CLOEXEC | os.O_CREAT | os.O_TRUNC | os.O_WRONLY
mktemp_chars = (string.ascii_letters + string.digits).encode('utf8')
class atomic_open:
__slots__ = ['_dirfd', '_basename','_fd', '_stream']
def __init__(self, file, mode='r', **kwargs):
if not set(mode) <= { 'b', 'w' }:
raise ValueError("Mode must be 'w' or 'wb'")
dirname, self._basename = os.path.split(file)
dirname = dirname or '.'
# We use a dir fd so we can avoid issues with CWD changes
self._dirfd = os.open(dirname, flags = os.O_DIRECTORY)
flags = FLAGS_W
flags &= ~ os.O_CREAT
flags |= os.O_TMPFILE
self._fd = os.open(".", flags, dir_fd=self._dirfd)
# Wrap the fd in an io
self._stream = os.fdopen(self._fd, mode, **kwargs)
def __getattr__(self, key):
return getattr(object.__getattribute__(self, '_stream'), key)
def __enter__(self):
return self
def close(self):
for _ in range(1000):
tempname = b'.' + self._basename.encode('utf8')
tempname += b'.' + bytes(random.choices(mktemp_chars, k=10))
retval = libc.linkat(self._fd, '', self._dirfd, tempname, AT_EMPTY_PATH)
if retval == 0:
break
errno = ctypes.get_errno()
if errno == EEXIST:
continue
raise OSError(errno, os.strerror(errno))
else:
raise FileExistsError
os.fsync(self._fd)
self._stream.close()
os.rename(tempname, self._basename, src_dir_fd=self._dirfd, dst_dir_fd=self._dirfd)
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.close()
return
from pathlib import Path
print('start')
print(list(Path().iterdir()))
with atomic_open('potato','w') as f:
print('writing')
print(list(Path().iterdir()))
f.write('test\n')
print('finished')
print(list(Path().iterdir()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment