Last active
September 23, 2020 08:28
-
-
Save dniku/d18080bec84cc4704a73 to your computer and use it in GitHub Desktop.
ZMQError: Interrupted system call
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing, threading, time, Queue | |
import zmq | |
class RPCServer(threading.Thread): | |
def __init__(self, rpc_address, queue): | |
super(RPCServer, self).__init__(name="RPC server") | |
self.queue = queue | |
self.rpc_address = rpc_address | |
self.stopped = False | |
self.context = None | |
self.zmq_socket = None | |
def run(self): | |
self.context = zmq.Context() | |
self.zmq_socket = self.context.socket(zmq.REP) | |
self.zmq_socket.bind(self.rpc_address) | |
while not self.stopped: | |
start = time.time() | |
try: | |
data = self.zmq_socket.recv(flags=zmq.NOBLOCK) | |
print('received from socket: %s' % data) | |
self.queue.put(data) | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
# Looping at most 10 times per second. This is a compromise between continuous | |
# polling (which is a huge resource hog) and the need to gracefully stop. | |
# If there is a better way to achieve this, I'd be glad to hear about it. | |
time.sleep(start + 0.1 - time.time()) | |
def stop(self): | |
self.stopped = True | |
class Dispatcher(threading.Thread): | |
def __init__(self, rpc_address): | |
super(Dispatcher, self).__init__(name="Dispatcher") | |
self.queue = Queue.Queue() | |
self.rpc_server = RPCServer(rpc_address, self.queue) | |
self.stopped = False | |
def run(self): | |
self.rpc_server.start() | |
while not self.stopped: | |
try: | |
item = self.queue.get(block=True, timeout=0.1) | |
print('received from queue: %s' % item) | |
if item == 'stop': | |
self.rpc_server.stop() | |
self.rpc_server.join() | |
self.stop() | |
except Queue.Empty: | |
pass | |
def stop(self): | |
self.stopped = True | |
class ServerProcess(multiprocessing.Process): | |
def __init__(self, rpc_address): | |
super(ServerProcess, self).__init__(name="Server process") | |
self.dispatcher = Dispatcher(rpc_address) | |
def run(self): | |
self.dispatcher.start() | |
self.dispatcher.join() | |
class RPCClient(object): | |
def __init__(self, rpc_address): | |
self.rpc_address = rpc_address | |
self.context = zmq.Context() | |
self.socket = self.context.socket(zmq.REQ) | |
self.socket.connect(self.rpc_address) | |
def stop(self): | |
self.socket.send('stop') | |
def shutdown(self): | |
self.socket.disconnect(self.rpc_address) | |
def test_stopping(): | |
rpc_address = 'tcp://127.0.0.1:3437' | |
server = ServerProcess(rpc_address) | |
client = RPCClient(rpc_address) | |
server.start() | |
time.sleep(1) | |
client.stop() | |
time.sleep(1) | |
assert not server.is_alive() | |
if __name__ == '__main__': | |
test_stopping() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment