Created
August 24, 2015 19:13
-
-
Save dniku/8f196ca5d6d76aed1003 to your computer and use it in GitHub Desktop.
Crash in Spyne
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading, multiprocessing, time, Queue | |
from spyne import MethodContext | |
from spyne.application import Application | |
from spyne.service import ServiceBase | |
from spyne.decorator import rpc | |
from spyne.protocol.msgpack import MessagePackRpc | |
from spyne.client.zeromq import ZeroMQClient | |
from spyne.auxproc import process_contexts | |
from spyne.server.zeromq import ZeroMQServer, ZmqMethodContext | |
class StoppableZeroMQServer(ZeroMQServer): | |
def __init__(self, *args, **kwargs): | |
super(StoppableZeroMQServer, self).__init__(*args, **kwargs) | |
def serve_once(self): | |
# The following is a copy-pasta from the Spyne sources. | |
error = None | |
initial_ctx = ZmqMethodContext(self) | |
initial_ctx.in_string = [self.zmq_socket.recv()] | |
contexts = self.generate_contexts(initial_ctx) | |
p_ctx, others = contexts[0], contexts[1:] | |
if p_ctx.in_error: | |
p_ctx.out_object = p_ctx.in_error | |
error = p_ctx.in_error | |
else: | |
self.get_in_object(p_ctx) | |
if p_ctx.in_error: | |
p_ctx.out_object = p_ctx.in_error | |
error = p_ctx.in_error | |
else: | |
self.get_out_object(p_ctx) | |
if p_ctx.out_error: | |
p_ctx.out_object = p_ctx.out_error | |
error = p_ctx.out_error | |
self.get_out_string(p_ctx) | |
process_contexts(self, others, error) | |
self.zmq_socket.send(''.join(p_ctx.out_string)) | |
p_ctx.close() | |
class RPCService(ServiceBase): | |
@rpc() | |
def stop(ctx): | |
ctx.udc.queue.put("STOPPING!!") | |
rpc_app = Application( | |
[RPCService], | |
tns='myrpc', | |
in_protocol=MessagePackRpc(validator='soft'), | |
out_protocol=MessagePackRpc() | |
) | |
class RPCServer(threading.Thread): | |
def __init__(self, rpc_address): | |
super(RPCServer, self).__init__(name='RPC server thread') | |
self.queue = Queue.Queue() | |
# The following is black magic to allow access to RPCServer from RPCService via ctx.udc. | |
def on_method_call(ctx): | |
ctx.udc = self | |
rpc_app.event_manager.add_listener('method_call', on_method_call) | |
self.rpc_server = StoppableZeroMQServer(rpc_app, rpc_address) | |
def run(self): | |
while True: | |
print('serving') | |
self.rpc_server.serve_once() | |
try: | |
msg = self.queue.get(block=True, timeout=0.1) | |
if msg == 'STOPPING!!': | |
break | |
except Queue.Empty: | |
pass | |
class RPCClient(ZeroMQClient): | |
def __init__(self, rpc_address): | |
super(RPCClient, self).__init__(rpc_address, rpc_app) | |
class WrapperProcess(multiprocessing.Process): | |
def __init__(self, rpc_address): | |
super(WrapperProcess, self).__init__(name='Wrapper process') | |
self.rpc_server = RPCServer(rpc_address) | |
def run(self): | |
self.rpc_server.start() | |
self.rpc_server.join() | |
def test_rpc_stop(): | |
rpc_address = 'tcp://127.0.0.1:4005' | |
proc = WrapperProcess(rpc_address) | |
client = RPCClient(rpc_address) | |
proc.start() | |
time.sleep(1) | |
client.service.stop() | |
time.sleep(1) | |
assert not proc.is_alive() | |
test_rpc_stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment