Skip to content

Instantly share code, notes, and snippets.

@jan-matejka
Created July 16, 2015 18:47
Show Gist options
  • Select an option

  • Save jan-matejka/f289d202504cc95c51fa to your computer and use it in GitHub Desktop.

Select an option

Save jan-matejka/f289d202504cc95c51fa to your computer and use it in GitHub Desktop.
fail works as expected but success hangs
# -*- coding: utf-8 -*-
import asyncio, functools
from nose.tools import raises
class ExitFailure(RuntimeError):
def __init__(self, exit_code):
self.exit_code = exit_code
class ExitSuccess:
pass
class ZeroExitCodeSubprocess(asyncio.SubprocessProtocol):
def __init__(self, future):
self.future = future
def connection_made(self, transport):
self.transport = transport
def process_exited(self):
ec = self.transport.get_returncode()
print(ec)
if ec:
print("fail")
self.future.set_exception(ExitFailure(ec))
else:
print("succ")
self.future.set_result(ExitSucess())
@asyncio.coroutine
def _call(loop, *args, **kw):
f = asyncio.Future(loop = loop)
t, p = yield from loop.subprocess_exec(
lambda: ZeroExitCodeSubprocess(f)
, *args
, **kw
)
yield from p.future
def test_call_success():
loop = asyncio.get_event_loop()
x = _call(loop, "true")
loop.run_until_complete(x)
@raises(ExitFailure)
def test_call_fail():
loop = asyncio.get_event_loop()
x = _call(loop, "false")
loop.run_until_complete(x)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment