Skip to content

Instantly share code, notes, and snippets.

@bergundy
Created August 27, 2012 21:39
Show Gist options
  • Save bergundy/3492507 to your computer and use it in GitHub Desktop.
Save bergundy/3492507 to your computer and use it in GitHub Desktop.
tornado_subprocess
import sys
while True:
l = raw_input()
sys.stdout.write(l+"\n")
sys.stdout.flush()
import signal, sys
from util import *
iterations = int(sys.argv[1])
itertime = float(sys.argv[2])
killtime = float(sys.argv[3])
@engine
def sigterm(*args):
print "got signal"
yield delay(killtime)
halt()
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigterm)
@init
def main():
for x in range(iterations):
print 'iter: %d' % x
yield delay(itertime)
print >>sys.stderr, 'iter: %d' % x
yield delay(itertime)
halt()
main()
import signal, sys
from util import *
iterations = int(sys.argv[1])
itertime = float(sys.argv[2])
killtime = float(sys.argv[3])
@engine
def sigterm(*args):
print >>sys.stderr, "got signal"
yield delay(killtime)
halt()
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigterm)
@init
def main():
for x in range(iterations):
sys.stdout.write('-')
yield delay(itertime)
halt()
main()
import unittest, sys, os, signal, time
from tornado_subprocess import *
from tornado.testing import AsyncTestCase
from mock import patch
class TestCase(AsyncTestCase):
def setUp(self):
super(TestCase, self).setUp()
Popen._manager = None
self.stdout = []
self.stderr = []
def kill(self):
try:
os.kill(self.p.pid, signal.SIGKILL)
pid, rc = os.waitpid(self.p.pid, 0)
except Exception, e:
pass
def tearDown(self):
self.kill()
def relative_command(self, cmd):
return [ sys.executable, os.path.join(os.path.abspath(os.path.dirname(__file__)), cmd) ]
def make_command(self, iterations, itertime, killtime):
return self.relative_command("make_it_last.py") + [ "%d" % iterations, "%0.2f" % itertime, "%0.2f" % killtime ]
def make_command_no_new_lines(self, iterations, itertime, killtime):
return self.relative_command("print_wo_newlines.py") + [ "%d" % iterations, "%0.2f" % itertime, "%0.2f" % killtime ]
def on_stdout(self, data):
self.stdout.append(data)
self.stop()
def on_stderr(self, data):
self.stderr.append(data)
self.stop()
def test_line_streaming_wo_newline(self):
chars_to_print = 409600
cmd = self.make_command_no_new_lines(chars_to_print, 0.001, 0.)
self.p = Popen(cmd, on_stdout = self.on_stdout, io_loop = self.io_loop)
self.wait()
self.wait()
self.io_loop.add_timeout(time.time()+0.01, self.stop)
self.wait()
self.p.terminate()
self.p.wait()
self.io_loop.add_timeout(time.time()+0.01, self.stop)
self.wait()
self.assertEqual(len(self.stdout), 3)
def test_popen(self):
cmd = self.make_command(1, 0., 0.)
self.p = Popen(cmd, io_loop = self.io_loop, close_fds = True)
def test_popen_exit_callback(self):
cmd = self.make_command(1, 0., 0.)
def on_exit(rc):
self.stop(rc)
self.p = Popen(cmd, io_loop = self.io_loop, close_fds = True, on_exit = on_exit)
rc = self.wait()
self.assertEqual(rc, 0)
def test_popen_stdout(self):
cmd = self.make_command(1, 0., 0.)
self.p = Popen(cmd, on_stdout = self.on_stdout, io_loop = self.io_loop, close_fds = True)
self.wait()
self.assertEqual(len(self.stdout), 1)
def test_popen_stdout_3_iterations(self):
cmd = self.make_command(3, 0., 0.)
self.p = Popen(cmd, on_stdout = self.on_stdout, io_loop = self.io_loop, close_fds = True)
self.wait()
self.assertEqual(len(self.stdout[0].split('\n')), 4)
def test_popen_stdout_3_iterations_w_delay(self):
cmd = self.make_command(3, 0.05, 0.)
self.p = Popen(cmd, on_stdout = self.on_stdout, io_loop = self.io_loop, close_fds = True)
self.wait()
self.assertEqual(len(self.stdout[0].split('\n')), 4)
def test_popen_stderr(self):
cmd = self.make_command(1, 0., 0.)
self.p = Popen(cmd, on_stderr = self.on_stderr, io_loop = self.io_loop, close_fds = True)
self.wait()
self.assertEqual(len(self.stderr), 1)
def test_popen_both(self):
cmd = self.make_command(1, 0., 0.)
self.p = Popen(cmd, on_stdout = self.on_stdout, on_stderr = self.on_stderr, io_loop = self.io_loop, close_fds = True)
self.wait()
self.wait()
self.assertEqual(len(self.stdout), 1)
self.assertEqual(len(self.stderr), 1)
def test_popen_redirect(self):
cmd = self.make_command(1, 0., 0.)
self.p = Popen(cmd, on_stdout = self.on_stdout, redirect_stderr = True, io_loop = self.io_loop, close_fds = True)
self.wait()
self.wait()
self.assertEqual(len(self.stdout), 2)
def test_popen_write(self):
cmd = self.relative_command('echo.py')
self.p = Popen(cmd, on_stdout = self.on_stdout, io_loop = self.io_loop, close_fds = True)
self.p.write('hello\n')
self.wait()
self.assertEqual(self.stdout, ['hello\n'])
def test_popen_partial_write(self):
cmd = self.relative_command('echo.py')
self.p = Popen(cmd, on_stdout = self.on_stdout, io_loop = self.io_loop, close_fds = True)
test = self
class MockWrite(object):
def __init__(self):
self.args = []
def __call__(self, fd, data):
self.args.append(data)
test.stop()
return 1
with patch('os.write', new_callable = MockWrite) as mock_write:
self.p.write('12')
self.wait()
self.assertEqual(mock_write.args, ['12', '2'])
def test_popen_write_retry(self):
cmd = self.relative_command('echo.py')
self.p = Popen(cmd, on_stdout = self.on_stdout, io_loop = self.io_loop, close_fds = True)
test = self
class MockWrite(object):
def __init__(self):
self.called = False
self.args = []
def __call__(self, fd, data):
self.args.append(data)
if not self.called:
self.called = True
raise OSError()
test.stop()
return 2
with patch('os.write', new_callable = MockWrite) as mock_write:
self.p.write('12')
self.wait()
self.assertEqual(mock_write.args, ['12', '12'])
import tornado.ioloop
import subprocess
import shlex
import os
import fcntl
import logging
import signal
import functools
from tornado import stack_context
from datetime import timedelta
from collections import deque
class Manager(object):
def __init__(self, io_loop = None):
self.io_loop = io_loop or tornado.ioloop.IOLoop.instance()
self.children = dict()
signal.signal(signal.SIGCHLD, self.on_child_died)
def on_child_died(self, *args, **kw_args):
self.io_loop.add_timeout(timedelta(milliseconds = 10), self.reap)
def reap(self):
pid = None
while True:
try:
pid, rc = os.waitpid(-1, os.WNOHANG)
except OSError, e:
if e.errno != 10: # No child processes
raise
else:
break
child = self.children.pop(pid, None)
child.join(rc)
class Popen(subprocess.Popen):
_manager = None
def __init__(self, cmd, uid = None, redirect_stderr = False, on_stdout = None, on_stderr = None, on_exit = None, io_loop = None, *args, **kwargs):
if isinstance(cmd, basestring):
cmd = shlex.split(str(cmd))
self.io_loop = io_loop or tornado.ioloop.IOLoop.instance()
if on_stdout:
self.on_stdout = stack_context.wrap(on_stdout)
kwargs['stdout'] = subprocess.PIPE
if redirect_stderr:
kwargs['stderr'] = subprocess.STDOUT
elif on_stderr:
self.on_stderr = stack_context.wrap(on_stderr)
kwargs['stderr'] = subprocess.PIPE
if on_exit:
self.on_exit = stack_context.wrap(on_exit)
kwargs['stdin'] = subprocess.PIPE
if uid is not None:
kwargs['preexec_fn'] = lambda: os.setuid(uid)
super(Popen, self).__init__(cmd, *args, **kwargs)
self.prepare_stdin()
with stack_context.NullContext():
if hasattr(self, 'on_stdout'):
self.prepare_fd('stdout')
if hasattr(self, 'on_stderr'):
self.prepare_fd('stderr')
self.manager.children[self.pid] = self
@property
def manager(self):
if self._manager is None:
self.__class__._manager = Manager(io_loop = self.io_loop)
return self._manager
def callback(self, callback, *args, **kwargs):
with stack_context.NullContext():
self.io_loop.add_callback(functools.partial(callback, *args, **kwargs))
def unblock_fd(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def prepare_stdin(self):
self.write_buffer = deque()
self._writing = False
self.stdin_writer = self.stdin.fileno()
self.unblock_fd(self.stdin_writer)
def prepare_fd(self, facility):
reader = getattr(self, facility).fileno()
setattr(self, '%s_reader' % facility, reader)
self.unblock_fd(reader)
self.io_loop.add_handler(reader, self._data_ready, self.io_loop.READ)
def drain(self):
if getattr(self, 'stdout_reader', None):
self.read_from_stdout()
del self.stdout_reader
if getattr(self, 'stderr_reader', None):
self.read_from_stderr()
del self.stderr_reader
def read_from_stderr(self):
self._data_ready(self.stderr_reader, self.io_loop.READ)
def read_from_stdout(self):
self._data_ready(self.stdout_reader, self.io_loop.READ)
def _data_ready(self, fd, events):
try:
data = os.read(fd, 4096)
if len(data) == 0:
logging.info("Child pipe closed")
self.io_loop.remove_handler(fd)
except OSError, e:
if e.errno != 11: # Resource temporarily unavailable
logging.error("Exception while reading from child pipe: %r" %e)
else:
if getattr(self, 'stdout_reader', None) == fd:
self.callback(self.on_stdout, data)
if len(data) == 0:
del self.stdout_reader
elif getattr(self, 'stderr_reader', None) == fd:
self.callback(self.on_stderr, data)
if len(data) == 0:
del self.stderr_reader
def _stdin_ready(self, fd, events):
while self.write_buffer:
try:
data = self.write_buffer.popleft()
bytes_written = os.write(fd, data)
if bytes_written < len(data):
self.write_buffer.appendleft(data[bytes_written:])
except OSError, e:
if e.errno != 11: # Resource temporarily unavailable
logging.error("Exception while writing to child pipe: %r" %e)
self.write_buffer.appendleft(data)
self.io_loop.remove_handler(fd)
self._writing = False
def write(self, data):
self.write_buffer.append(data)
if not self._writing:
with stack_context.NullContext():
self.io_loop.add_handler(self.stdin_writer, self._stdin_ready, self.io_loop.WRITE)
self._writing = True
def __del__(self):
self.drain()
super(Popen, self).__del__()
def join(self, rc):
if hasattr(self, 'on_exit'):
self.callback(self.on_exit, rc)
super(Popen, self).join()
from tornado.gen import engine, Runner, Task
from tornado.ioloop import IOLoop
from functools import wraps
import time
def init(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
engine(fn)(*args, **kwargs)
IOLoop.instance().start()
return wrapper
def halt():
IOLoop.instance().stop()
def delay(seconds):
return Task(IOLoop.instance().add_timeout, time.time()+seconds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment