Skip to content

Instantly share code, notes, and snippets.

@jul
Created November 5, 2012 00:34
Show Gist options
  • Select an option

  • Save jul/4014563 to your computer and use it in GitHub Desktop.

Select an option

Save jul/4014563 to your computer and use it in GitHub Desktop.
signal as wired select version
#!/usr/bin/env python3.3
import signal as s
from time import sleep
import fcntl
import select
from time import asctime as _asctime
from random import randint
import os,sys
asctime= lambda : _asctime()[11:19]
class Processor(object):
def __init__(self,signal_map,
slow=False, clear_sig=s.SIGHUP, validate_sig=s.SIGCONT):
self.p_out, self.p_in = os.pipe()
fcntl.fcntl(self.p_in, fcntl.F_SETFD, os.O_NONBLOCK)
fcntl.fcntl(self.p_out, fcntl.F_SETFD, os.O_NONBLOCK)
self.cmd=0
self.slow=slow
self.signal_map = signal_map
self.clear_sig = clear_sig
self.validate_sig = validate_sig
self.value = 0
self._help = [ "\nHow signal are wired"]
self._signal_queue = []
self.current_signal=None
if validate_sig in signal_map or clear_sig in signal_map:
raise Exception("Dont wire twice a signal")
def top_half(sig_no, frame):
## UNPROTECTED CRITICAL SECTION
os.write(self.p_in, bytearray([ sig_no ]))
## END OF CRITICAL
for offset,sig_no in enumerate(signal_map):
s.signal(sig_no, top_half)
self._help += [ "sig(%d) sets v[%d]=%d"%(sig_no, offset, 1) ]
self._help += [ "attaching clearing to %d" % clear_sig]
s.signal(clear_sig, top_half)
self._help += [ "attaching validating to %d" % validate_sig ]
s.signal(validate_sig,top_half)
self._help = "\n".join( self._help)
print(self._help)
def bottom_half(self):
sig_no = ord(os.read(self.p_out,1))
now = asctime()
seen = self.cmd
self.cmd += 1
treated=False
self.signal=None
if sig_no in self.signal_map:
offset=self.signal_map.index(sig_no)
beauty = randint(3,10) if self.slow else 0
if self.slow:
print("[%d]%s:RCV: sig%d => [%d]=1 in (%d)s" % (
seen,now,sig_no, offset, beauty
))
if beauty:
sleep(beauty)
self.value|= 1<<offset
now=asctime()
print("[%d]%s:ACK: sig%d => [%d]=1 (%d)" % (
seen,now,sig_no, offset, beauty
))
treated=True
if sig_no == self.clear_sig:
print("[%d]%s:ACK clearing value" % (seen,now))
self.value=0
treated=True
if sig_no == self.validate_sig:
print("[%d]%s:ACK READING val is %d" % (seen,now,self.value))
treated=True
if not treated:
print("unhandled execption %d" % sig_no)
exit(0)
wired=Processor([ s.SIGUSR1, s.SIGUSR2, s.SIGBUS, s.SIGPWR ])
while True:
eintr=False
if not eintr or select.select([wired.p_out],[],[]):
try:
wired.bottom_half()
sys.stdout.flush()
eintr=False
except OSError as err:
if err.errno==4:
eintr=True
print("SNAFU EINTR")
try:
os.lseek(wired.p_out,0,0)
except OSError:
pass
@jul
Copy link
Author

jul commented Nov 5, 2012

Output:
$ for i in 10 12 7 30 18 1; do echo "echo "-------------$i------------"; kill -$i 4525 & " ; done | bash
-------------10------------
-------------12------------
SNAFU EINTR
-------------7------------
[0]01:35:07:ACK: sig10 => [0]=1 (0)
-------------30------------
-------------18------------
SNAFU EINTR
-------------1------------
[1]01:35:07:ACK: sig30 => [3]=1 (0)
SNAFU EINTR
[2]01:35:07:ACK clearing value
650 jul@faith:~/src/signal 01:35:07
$ SNAFU EINTR
[3]01:35:07:ACK READING val is 0
[4]01:35:07:ACK: sig7 => [2]=1 (0)
SNAFU EINTR
[5]01:35:07:ACK: sig12 => [1]=1 (0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment