Created
October 25, 2012 13:52
-
-
Save jul/3952658 to your computer and use it in GitHub Desktop.
using signals as wire
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3.3 | |
| import signal as s | |
| from time import sleep | |
| import fcntl | |
| from time import asctime as _asctime | |
| from random import randint | |
| import os,sys | |
| asctime= lambda : _asctime()[11:19] | |
| class Processor(object): | |
| def __init__(self,signal_map, | |
| slow=False, clear_sig=s.SIGHUP, validate_sig=s.SIGCONT): | |
| self.p_out, self.p_in = os.pipe() | |
| fcntl.fcntl(self. p_in, fcntl.F_SETFD, os.O_NONBLOCK) | |
| fcntl.fcntl(self.p_out, fcntl.F_SETFD, os.O_NONBLOCK) | |
| self.cmd=0 | |
| self.slow=slow | |
| self.signal_map = signal_map | |
| self.clear_sig = clear_sig | |
| self.validate_sig = validate_sig | |
| self.value = 0 | |
| self._help = [ "\nHow signal are wired"] | |
| self._signal_queue = [] | |
| self.current_signal=None | |
| if validate_sig in signal_map or clear_sig in signal_map: | |
| raise Exception("Dont wire twice a signal") | |
| def top_half(sig_no, frame): | |
| ## UNPROTECTED CRITICAL SECTION | |
| os.write(self.p_in, bytearray([ sig_no ]) ) | |
| ## END OF CRITICAL | |
| for offset,sig_no in enumerate(signal_map): | |
| s.signal(sig_no, top_half) | |
| self._help += [ "sig(%d) sets v[%d]=%d"%(sig_no, offset, 1) ] | |
| self._help += [ "attaching clearing to %d" % clear_sig] | |
| s.signal(clear_sig, top_half) | |
| self._help += [ "attaching validating to %d" % validate_sig ] | |
| s.signal(validate_sig,top_half) | |
| self._help = "\n".join( self._help) | |
| print(self._help) | |
| def bottom_half(self): | |
| sig_no = ord(os.read(self.p_out,1)) | |
| now = asctime() | |
| seen = self.cmd | |
| self.cmd += 1 | |
| treated=False | |
| self.signal=None | |
| if sig_no in self.signal_map: | |
| offset=self.signal_map.index(sig_no) | |
| beauty = randint(3,10) if self.slow else 0 | |
| if self.slow: | |
| print("[%d]%s:RCV: sig%d => [%d]=1 in (%d)s" % ( | |
| seen,now,sig_no, offset, beauty | |
| )) | |
| if beauty: | |
| sleep(beauty) | |
| self.value|= 1<<offset | |
| now=asctime() | |
| print("[%d]%s:ACK: sig%d => [%d]=1 (%d)" % ( | |
| seen,now,sig_no, offset, beauty | |
| )) | |
| treated=True | |
| if sig_no == self.clear_sig: | |
| print("[%d]%s:ACK clearing value" % (seen,now)) | |
| self.value=0 | |
| treated=True | |
| if sig_no == self.validate_sig: | |
| print("[%d]%s:ACK READING val is %d" % (seen,now,self.value)) | |
| treated=True | |
| if not treated: | |
| print("unhandled execption %d" % sig_no) | |
| exit(0) | |
| wired=Processor([ s.SIGUSR1, s.SIGUSR2, s.SIGBUS, s.SIGPWR ]) | |
| while True: | |
| s.pause() | |
| wired.bottom_half() | |
| sys.stdout.flush() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment