Skip to content

Instantly share code, notes, and snippets.

@dniku
Last active September 23, 2020 08:28
Show Gist options
  • Save dniku/d18080bec84cc4704a73 to your computer and use it in GitHub Desktop.
Save dniku/d18080bec84cc4704a73 to your computer and use it in GitHub Desktop.
ZMQError: Interrupted system call
import multiprocessing, threading, time, Queue
import zmq
class RPCServer(threading.Thread):
def __init__(self, rpc_address, queue):
super(RPCServer, self).__init__(name="RPC server")
self.queue = queue
self.rpc_address = rpc_address
self.stopped = False
self.context = None
self.zmq_socket = None
def run(self):
self.context = zmq.Context()
self.zmq_socket = self.context.socket(zmq.REP)
self.zmq_socket.bind(self.rpc_address)
while not self.stopped:
start = time.time()
try:
data = self.zmq_socket.recv(flags=zmq.NOBLOCK)
print('received from socket: %s' % data)
self.queue.put(data)
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
# Looping at most 10 times per second. This is a compromise between continuous
# polling (which is a huge resource hog) and the need to gracefully stop.
# If there is a better way to achieve this, I'd be glad to hear about it.
time.sleep(start + 0.1 - time.time())
def stop(self):
self.stopped = True
class Dispatcher(threading.Thread):
def __init__(self, rpc_address):
super(Dispatcher, self).__init__(name="Dispatcher")
self.queue = Queue.Queue()
self.rpc_server = RPCServer(rpc_address, self.queue)
self.stopped = False
def run(self):
self.rpc_server.start()
while not self.stopped:
try:
item = self.queue.get(block=True, timeout=0.1)
print('received from queue: %s' % item)
if item == 'stop':
self.rpc_server.stop()
self.rpc_server.join()
self.stop()
except Queue.Empty:
pass
def stop(self):
self.stopped = True
class ServerProcess(multiprocessing.Process):
def __init__(self, rpc_address):
super(ServerProcess, self).__init__(name="Server process")
self.dispatcher = Dispatcher(rpc_address)
def run(self):
self.dispatcher.start()
self.dispatcher.join()
class RPCClient(object):
def __init__(self, rpc_address):
self.rpc_address = rpc_address
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(self.rpc_address)
def stop(self):
self.socket.send('stop')
def shutdown(self):
self.socket.disconnect(self.rpc_address)
def test_stopping():
rpc_address = 'tcp://127.0.0.1:3437'
server = ServerProcess(rpc_address)
client = RPCClient(rpc_address)
server.start()
time.sleep(1)
client.stop()
time.sleep(1)
assert not server.is_alive()
if __name__ == '__main__':
test_stopping()
@dniku
Copy link
Author

dniku commented Aug 26, 2015

Exception in thread RPC server:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/home/pastafarianist/zmq_error.py", line 22, in run
    self.queue.put(self.zmq_socket.recv(flags=zmq.NOBLOCK))
  File "zmq/backend/cython/socket.pyx", line 631, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5775)
  File "zmq/backend/cython/socket.pyx", line 665, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5575)
  File "zmq/backend/cython/socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1728)
  File "zmq/backend/cython/checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6251)
    raise ZMQError(errno)
ZMQError: Interrupted system call

Traceback (most recent call last):
  File "/home/pastafarianist/zmq_error.py", line 90, in <module>
    test_stopping()
  File "/home/pastafarianist/zmq_error.py", line 87, in test_stopping
    assert not server.is_alive()
AssertionError

@chengxiaoku
Copy link

你好, 我也遇到了该问题,请问你是怎么解决的呢,我的邮箱:[email protected]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment