Skip to content

Instantly share code, notes, and snippets.

@Gribouillis
Forked from bregma/self-socket.py
Created February 2, 2025 22:40
Show Gist options
  • Save Gribouillis/fe32ba1fa2201bd534030d98b5debcaa to your computer and use it in GitHub Desktop.
Save Gribouillis/fe32ba1fa2201bd534030d98b5debcaa to your computer and use it in GitHub Desktop.
Using the self-pipe trick in Python3
#!/usr/bin/python3
#
# Investigate the use of the self-pipe trick from Python
#
import logging
import os
import selectors
import signal
import struct
import sys
logger = logging.getLogger('self-socket')
logger.setLevel(logging.DEBUG)
lch = logging.StreamHandler(sys.stdout)
lfm = logging.Formatter('%(asctime)s %(name)s [%(levelname)s] %(message)s')
lch.setFormatter(lfm)
logger.addHandler(lch)
class ApplicationProxy(object):
"""Proxies the lifetime of an application."""
def __init__(self):
logger.info('ApplicationProxy created')
def start(self, cmdline):
logger.info('ApplicationProxy.start() called')
os.popen(cmdline)
def cleanup(self):
logger.info('ApplicationProxy.cleanup() called')
class Container(object):
""" A fake LibertineContainer object for demo purposes."""
def __init__(self):
self._app = ApplicationProxy()
logger.info('Container created')
def launch_application(self, cmdline):
self._app.start(cmdline)
logger.info('application launched')
def cleanup_application(self):
self._app.cleanup()
logger.info('application cleaned up')
class MainLoop(object):
""" A watches all subprocesses and pipes.
This particular design supports only a single subprocess instance.
"""
def __init__(self, container):
self._container = container
self._selector = selectors.DefaultSelector()
self._set_signal_handlers()
logger.info('MainLoop created')
def _set_signal_handlers(self):
def noopSignalHandler(*args):
pass
signal.signal(signal.SIGCHLD, noopSignalHandler)
signal.signal(signal.SIGINT, noopSignalHandler)
sig_r_fd, sig_w_fd = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
signal.set_wakeup_fd(sig_w_fd)
self._add_read_fd_handler(sig_r_fd, self._handle_sig_fd)
def _add_read_fd_handler(self, fd, handler):
self._selector.register(fd, selectors.EVENT_READ, handler)
def _handle_sig_fd(self, fd):
data = os.read(fd, 4)
sig, = struct.unpack('%uB' % len(data), data)
if sig == signal.SIGCHLD:
logger.info('SIGCHLD received')
self._container.cleanup_application()
if sig == signal.SIGINT:
logger.info('SIGINT received')
raise StopIteration('keyboard interrupt')
def run(self):
logger.info('main_loop.run() begins')
try:
while True:
events = self._selector.select()
for key, mask in events:
callback = key.data
callback(key.fd)
except StopIteration:
pass
except Exception as ex:
logger.error(ex)
logger.info('main_loop.run() ends')
if __name__ == '__main__':
logger.info('main begins')
container = Container()
main_loop = MainLoop(container)
container.launch_application('echo "hello world!"')
main_loop.run()
logger.info('main ends')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment