-
-
Save jakirkham/100a7f5e86b0ff2a22de0850723a4c5c to your computer and use it in GitHub Desktop.
import ctypes | |
import mmap | |
import os | |
import stat | |
import sys | |
try: | |
unicode | |
except NameError: | |
unicode = str | |
rtld = ctypes.cdll.LoadLibrary(None) | |
_shm_open = rtld.shm_open | |
_shm_unlink = rtld.shm_unlink | |
def shm_open(name): | |
if isinstance(name, bytes): | |
name = ctypes.create_string_buffer(name) | |
elif isinstance(name, unicode): | |
name = ctypes.create_unicode_buffer(name) | |
else: | |
raise TypeError("`name` must be `bytes` or `unicode`") | |
result = _shm_open( | |
name, | |
ctypes.c_int(os.O_RDWR | os.O_CREAT | os.O_EXCL), | |
ctypes.c_ushort(stat.S_IRUSR | stat.S_IWUSR) | |
) | |
if result == -1: | |
raise RuntimeError(os.strerror(ctypes.get_errno())) | |
return result | |
def shm_unlink(name): | |
if isinstance(name, bytes): | |
name = ctypes.create_string_buffer(name) | |
elif isinstance(name, unicode): | |
name = ctypes.create_unicode_buffer(name) | |
else: | |
raise TypeError("`name` must be `bytes` or `unicode`") | |
result = _shm_unlink(name) | |
if result == -1: | |
raise RuntimeError(os.strerror(ctypes.get_errno())) |
import os | |
import mmap | |
import numpy as np | |
from shm import * | |
fid = shm_open("arr") | |
size = 10 | |
os.ftruncate(fid, size) # Skipped by processes reusing the memory | |
m = mmap.mmap(fid, size) | |
a = np.frombuffer(m, dtype=np.uint8) |
On Linux, something like this is preferable since the kernel will auto-close it (shm_open
requires a corresponding shm_unlink
, otherwise it leaks an open "file" at /dev/shm/
):
import os
import mmap
def get_shared(size=1024, name="shared_memory"):
fd = os.memfd_create(name)
os.ftruncate(fd, size)
m = mmap.mmap(fd, size)
os.close(fd)
return m
The most annoying thing is not the use of posix shared memory in Python but the synchronization over the shared memory. There is a package named posix_ipc
that exposes the posix semaphore, shared_memory, message queue in form of Python objects, but posix semaphore has its limitation, it does not release automatically when the process holding it crashes which causes a waiting process to block indefinitely. The posix robust mutex shared via a shared memory might be the best in an inter-process case where a mutex fits, but there seems no Python module that provides a wrapper over it, hence I'm more interested in how to use the posix mutex in Python and put them in the shared memory to be accessed by different processes that can even be written in different languages, e.g. a Python process and C process.
The string buffer handling here is slightly confusing. Can't we directly pass in an encoded name (i.e. bytes), at least if we set argtypes for the functions?