Skip to content

Instantly share code, notes, and snippets.

@schlamar
Last active December 14, 2015 01:08
Show Gist options
  • Save schlamar/5003484 to your computer and use it in GitHub Desktop.
Save schlamar/5003484 to your computer and use it in GitHub Desktop.
Asynchronous child IO with pyuv
import os
import re
import pyuv
LINESEP = os.linesep.encode()
PATTERN = re.compile(r'^(?P<file_name>([A-Za-z]:)?[^:]+):'
'(?P<line_number>\d+):((?P<position>\d+):)?'
'\s+((?P<code>\w{4,4})\s+)?(?P<reason>.*)$')
class LineReaderPipe(pyuv.Pipe):
def __init__(self, *args, **kwargs):
super(LineReaderPipe, self).__init__(*args, **kwargs)
self.callback = None
self.buffer = b''
def on_pipe_read(self, pipe, data, error):
if error:
pipe.close()
return
for line in data.splitlines(True):
if not line[-len(LINESEP):] == LINESEP:
self.buffer += line
return
line = line.strip()
if self.buffer:
line = self.buffer + line
self.buffer = b''
self.callback(line)
def start_read(self, callback):
self.callback = callback
super(LineReaderPipe, self).start_read(self.on_pipe_read)
def exit_cb(proc, exit_status, term_signal):
print (proc, exit_status, term_signal)
def process_line(line):
match = PATTERN.match(line.decode('utf-8'))
if match:
print (match.group('line_number'), match.group('code'))
def main():
loop = pyuv.Loop.default_loop()
pipe = LineReaderPipe(loop)
proc = pyuv.Process(loop)
ios = [pyuv.StdIO(),
# pyuv.StdIO(fd=1, flags=pyuv.UV_INHERIT_FD),
pyuv.StdIO(pipe, flags=pyuv.UV_CREATE_PIPE | pyuv.UV_WRITABLE_PIPE),
pyuv.StdIO()]
proc.spawn('flake8', exit_cb, ('bottle.py',), stdio=ios)
pipe.start_read(process_line)
loop.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment