Created
October 30, 2012 11:16
-
-
Save dbr/3979658 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Broken-but-maybe-informative version of code from | |
http://stackoverflow.com/questions/13124235/python-process-communications-via-pipes-race-condition#13124235 | |
""" | |
import os | |
import string | |
def safe_write(*args, **kwargs): | |
while True: | |
try: | |
return os.write(*args, **kwargs) | |
except OSError as e: | |
if e.errno == 35: | |
import time | |
print(".") | |
time.sleep(0.5) | |
else: | |
raise | |
class Pipe: | |
""" | |
there are a bunch of constants set up here. I dont think it would be useful to include them. Just think like this: Pipe.WHATEVER = 'WHATEVER' | |
""" | |
LINE_START = "LINE_START" | |
MESSAGE_START = "MESSAGE_START" | |
MESSAGE_END = "MESSAGE_END" | |
LINE_END = "LINE_END" | |
KEY = "KEY" | |
VALUE = "VALUE" | |
READLINE_FAIL = "READLINE_FAIL" | |
def __init__(self,sPath): | |
""" | |
create the fifo. if it already exists just associate with it | |
""" | |
self.sPath = sPath | |
if not os.path.exists(sPath): | |
os.mkfifo(sPath) | |
self.iFH = os.open(sPath,os.O_RDWR | os.O_NONBLOCK) | |
self.iFHBlocking = os.open(sPath,os.O_RDWR) | |
def write(self,dMessage): | |
""" | |
write the dict to the fifo | |
if dMessage is not a dictionary then there will be an exception here. There never is | |
""" | |
self.writeln(Pipe.MESSAGE_START) | |
for k in dMessage: | |
self.writeln(Pipe.KEY) | |
self.writeln(k) | |
self.writeln(Pipe.VALUE) | |
self.writeln(dMessage[k]) | |
self.writeln(Pipe.MESSAGE_END) | |
def writeln(self, s): | |
safe_write(self.iFH, bytes('{0} : {1}\n'.format(Pipe.LINE_START, len(s)+1), 'utf-8')) | |
os.fsync(self.iFH) | |
safe_write(self.iFH, bytes('{0}\n'.format(s), 'utf-8')) | |
os.fsync(self.iFH) | |
safe_write(self.iFH, bytes(Pipe.LINE_END+'\n','utf-8')) | |
os.fsync(self.iFH) | |
def readln(self): | |
""" | |
look for LINE_START, get line size | |
read until LINE_END | |
clean up | |
return string | |
""" | |
#os.fsync(self.iFH) | |
iLineStartBaseLength = len(self.LINE_START)+3 #'{0} : ' | |
s = os.read(self.iFH, iLineStartBaseLength).decode('utf-8') | |
print(s) | |
print(len(s), iLineStartBaseLength) | |
assert len(s) == iLineStartBaseLength | |
if Pipe.LINE_START in s: | |
#get the length of the line | |
sLineLen = '' | |
while True: | |
sCurrent = os.read(self.iFH,1).decode('utf-8') | |
if sCurrent == '\n': | |
break | |
sLineLen += sCurrent | |
try: | |
iLineLen = int(sLineLen.strip(string.punctuation+string.whitespace)) | |
except: | |
raise Exception('Not a valid line length: "{0}"'.format(sLineLen)) | |
#read the line | |
# raise Exception(sLineLen) | |
sLine = os.read(self.iFHBlocking,iLineLen).decode('utf-8') | |
#read the line terminator | |
sTerm = os.read(self.iFH,len(Pipe.LINE_END+'\n')).decode('utf-8') | |
if sTerm == Pipe.LINE_END+'\n': | |
return sLine | |
return Pipe.READLINE_FAIL | |
else: | |
return Pipe.READLINE_FAIL | |
def read(self): | |
""" | |
read from the fifo, make a dict | |
""" | |
dRet = {} | |
sKey = '' | |
sValue = '' | |
sCurrent = None | |
def value_flush(): | |
nonlocal dRet, sKey, sValue, sCurrent | |
if sKey: | |
dRet[sKey.strip()] = sValue.strip() | |
sKey = '' | |
sValue = '' | |
sCurrent = '' | |
if True or self.message_start(): #FIXME: hacky | |
while True: | |
try: | |
sLine = self.readln() | |
except OSError: | |
continue # Try again | |
if sLine == Pipe.READLINE_FAIL: | |
continue # try again again | |
self.running_log.append(sLine) | |
if Pipe.MESSAGE_END in sLine: | |
value_flush() | |
return dRet | |
elif Pipe.KEY in sLine: | |
value_flush() | |
sCurrent = Pipe.KEY | |
elif Pipe.VALUE in sLine: | |
sCurrent = Pipe.VALUE | |
else: | |
if sCurrent == Pipe.VALUE: | |
sValue += sLine | |
elif sCurrent == Pipe.KEY: | |
sKey += sLine | |
else: | |
return Pipe.NO_MESSAGE | |
TEST_VALUE = {'a':'a1','b':'b2','c':'c3','d':'d4','e':'e5'} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from piping import Pipe, TEST_VALUE | |
oP = Pipe("/tmp/pipey") | |
while 1: | |
oP.write(TEST_VALUE) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from piping import Pipe, TEST_VALUE | |
oP = Pipe("/tmp/pipey") | |
while 1: | |
oP.running_log = [] | |
line = oP.read() | |
if line == Pipe.READLINE_FAIL: | |
pass | |
elif line != TEST_VALUE: | |
print(line) | |
print(oP.running_log) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment