Skip to content

Instantly share code, notes, and snippets.

@jul
Created October 25, 2012 13:52
Show Gist options
  • Select an option

  • Save jul/3952658 to your computer and use it in GitHub Desktop.

Select an option

Save jul/3952658 to your computer and use it in GitHub Desktop.
using signals as wire
#!/usr/bin/env python3.3
import signal as s
from time import sleep
import fcntl
from time import asctime as _asctime
from random import randint
import os,sys
asctime= lambda : _asctime()[11:19]
class Processor(object):
def __init__(self,signal_map,
slow=False, clear_sig=s.SIGHUP, validate_sig=s.SIGCONT):
self.p_out, self.p_in = os.pipe()
fcntl.fcntl(self. p_in, fcntl.F_SETFD, os.O_NONBLOCK)
fcntl.fcntl(self.p_out, fcntl.F_SETFD, os.O_NONBLOCK)
self.cmd=0
self.slow=slow
self.signal_map = signal_map
self.clear_sig = clear_sig
self.validate_sig = validate_sig
self.value = 0
self._help = [ "\nHow signal are wired"]
self._signal_queue = []
self.current_signal=None
if validate_sig in signal_map or clear_sig in signal_map:
raise Exception("Dont wire twice a signal")
def top_half(sig_no, frame):
## UNPROTECTED CRITICAL SECTION
os.write(self.p_in, bytearray([ sig_no ]) )
## END OF CRITICAL
for offset,sig_no in enumerate(signal_map):
s.signal(sig_no, top_half)
self._help += [ "sig(%d) sets v[%d]=%d"%(sig_no, offset, 1) ]
self._help += [ "attaching clearing to %d" % clear_sig]
s.signal(clear_sig, top_half)
self._help += [ "attaching validating to %d" % validate_sig ]
s.signal(validate_sig,top_half)
self._help = "\n".join( self._help)
print(self._help)
def bottom_half(self):
sig_no = ord(os.read(self.p_out,1))
now = asctime()
seen = self.cmd
self.cmd += 1
treated=False
self.signal=None
if sig_no in self.signal_map:
offset=self.signal_map.index(sig_no)
beauty = randint(3,10) if self.slow else 0
if self.slow:
print("[%d]%s:RCV: sig%d => [%d]=1 in (%d)s" % (
seen,now,sig_no, offset, beauty
))
if beauty:
sleep(beauty)
self.value|= 1<<offset
now=asctime()
print("[%d]%s:ACK: sig%d => [%d]=1 (%d)" % (
seen,now,sig_no, offset, beauty
))
treated=True
if sig_no == self.clear_sig:
print("[%d]%s:ACK clearing value" % (seen,now))
self.value=0
treated=True
if sig_no == self.validate_sig:
print("[%d]%s:ACK READING val is %d" % (seen,now,self.value))
treated=True
if not treated:
print("unhandled execption %d" % sig_no)
exit(0)
wired=Processor([ s.SIGUSR1, s.SIGUSR2, s.SIGBUS, s.SIGPWR ])
while True:
s.pause()
wired.bottom_half()
sys.stdout.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment