Skip to content

Instantly share code, notes, and snippets.

@dniku
Created August 24, 2015 19:13
Show Gist options
  • Save dniku/8f196ca5d6d76aed1003 to your computer and use it in GitHub Desktop.
Save dniku/8f196ca5d6d76aed1003 to your computer and use it in GitHub Desktop.
Crash in Spyne
import threading, multiprocessing, time, Queue
from spyne import MethodContext
from spyne.application import Application
from spyne.service import ServiceBase
from spyne.decorator import rpc
from spyne.protocol.msgpack import MessagePackRpc
from spyne.client.zeromq import ZeroMQClient
from spyne.auxproc import process_contexts
from spyne.server.zeromq import ZeroMQServer, ZmqMethodContext
class StoppableZeroMQServer(ZeroMQServer):
def __init__(self, *args, **kwargs):
super(StoppableZeroMQServer, self).__init__(*args, **kwargs)
def serve_once(self):
# The following is a copy-pasta from the Spyne sources.
error = None
initial_ctx = ZmqMethodContext(self)
initial_ctx.in_string = [self.zmq_socket.recv()]
contexts = self.generate_contexts(initial_ctx)
p_ctx, others = contexts[0], contexts[1:]
if p_ctx.in_error:
p_ctx.out_object = p_ctx.in_error
error = p_ctx.in_error
else:
self.get_in_object(p_ctx)
if p_ctx.in_error:
p_ctx.out_object = p_ctx.in_error
error = p_ctx.in_error
else:
self.get_out_object(p_ctx)
if p_ctx.out_error:
p_ctx.out_object = p_ctx.out_error
error = p_ctx.out_error
self.get_out_string(p_ctx)
process_contexts(self, others, error)
self.zmq_socket.send(''.join(p_ctx.out_string))
p_ctx.close()
class RPCService(ServiceBase):
@rpc()
def stop(ctx):
ctx.udc.queue.put("STOPPING!!")
rpc_app = Application(
[RPCService],
tns='myrpc',
in_protocol=MessagePackRpc(validator='soft'),
out_protocol=MessagePackRpc()
)
class RPCServer(threading.Thread):
def __init__(self, rpc_address):
super(RPCServer, self).__init__(name='RPC server thread')
self.queue = Queue.Queue()
# The following is black magic to allow access to RPCServer from RPCService via ctx.udc.
def on_method_call(ctx):
ctx.udc = self
rpc_app.event_manager.add_listener('method_call', on_method_call)
self.rpc_server = StoppableZeroMQServer(rpc_app, rpc_address)
def run(self):
while True:
print('serving')
self.rpc_server.serve_once()
try:
msg = self.queue.get(block=True, timeout=0.1)
if msg == 'STOPPING!!':
break
except Queue.Empty:
pass
class RPCClient(ZeroMQClient):
def __init__(self, rpc_address):
super(RPCClient, self).__init__(rpc_address, rpc_app)
class WrapperProcess(multiprocessing.Process):
def __init__(self, rpc_address):
super(WrapperProcess, self).__init__(name='Wrapper process')
self.rpc_server = RPCServer(rpc_address)
def run(self):
self.rpc_server.start()
self.rpc_server.join()
def test_rpc_stop():
rpc_address = 'tcp://127.0.0.1:4005'
proc = WrapperProcess(rpc_address)
client = RPCClient(rpc_address)
proc.start()
time.sleep(1)
client.service.stop()
time.sleep(1)
assert not proc.is_alive()
test_rpc_stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment