Last active
March 3, 2025 00:59
-
-
Save 140am/ca661b9a4fca550f9554 to your computer and use it in GitHub Desktop.
Simple Python / ØMQ IPC (Inter Process Communication) performance benchmark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Simple IPC benchmark test | |
Test throughput of 512 KB messages sent between two python processes using: | |
- multiprocessing pipe | |
- zeroMQ PUSH/PULL | |
- zeroMQ DEALER/DEALER | |
Result: | |
2014-05-20 16:16:32,782 INFO 11612 ipc - Running multiprocessing Pipe() benchmark: | |
2014-05-20 16:16:33,871 INFO 11612 ipc Sending 10000 numbers to Pipe() took 1.08912396431 seconds | |
2014-05-20 16:16:44,629 INFO 11612 ipc Sending 100000 numbers to Pipe() took 10.7582190037 seconds | |
2014-05-20 16:18:32,023 INFO 11612 ipc Sending 1000000 numbers to Pipe() took 107.393200874 seconds | |
2014-05-20 16:18:35,026 INFO 11612 ipc - Running ZMQ() DEALER/DEALER benchmark: | |
2014-05-20 16:18:36,151 INFO 11612 ipc Sending 10000 numbers via ZMQ() took 1.12546110153 seconds | |
2014-05-20 16:18:45,249 INFO 11612 ipc Sending 100000 numbers via ZMQ() took 9.09737110138 seconds | |
2014-05-20 16:20:32,940 INFO 11612 ipc Sending 1000000 numbers via ZMQ() took 107.690907001 seconds | |
2014-05-20 16:20:35,943 INFO 11612 ipc - Running ZMQ() PUSH/PULL benchmark: | |
2014-05-20 16:20:36,980 INFO 11612 ipc Sending 10000 numbers via ZMQ() took 1.03703999519 seconds | |
2014-05-20 16:20:48,757 INFO 11612 ipc Sending 100000 numbers via ZMQ() took 11.7766180038 seconds | |
2014-05-20 16:22:57,251 INFO 11612 ipc Sending 1000000 numbers via ZMQ() took 128.493817091 seconds | |
""" | |
from gevent import monkey | |
monkey.patch_all(thread=False, socket=False) | |
import multiprocessing | |
import time | |
import zmq | |
import logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)-15s %(levelname)-8s %(process)d %(module)s %(message)s' | |
) | |
log = logging.getLogger(__name__) | |
# multiprocessing Pipe | |
def pipe_reader(pipe, count): | |
output_p, input_p = pipe | |
input_p.close() # We are only reading | |
cc = 0 | |
while cc < count: | |
try: | |
msg = output_p.recv() | |
except EOFError: | |
break | |
else: | |
cc += 1 | |
def pipe_writer(count, input_p): | |
message = 'x'*512*1024 | |
for ii in xrange(0, count): | |
input_p.send(message) | |
# ZeroMQ DEALER/DEALER | |
class ZmqReaderDealer(multiprocessing.Process): | |
def __init__(self, count): | |
super(ZmqReaderDealer, self).__init__() | |
self.count = count | |
self.cc = 0 | |
def run(self): | |
ctx = zmq.Context() | |
sub = ctx.socket(zmq.DEALER) | |
#sub.setsockopt(zmq.RCVHWM, 1) | |
#sub.setsockopt(zmq.LINGER, 1) | |
sub.bind('ipc:///tmp/zmqtest') | |
while self.cc < self.count: | |
self.cc += 1 | |
msg = sub.recv(copy=False) | |
sub.close() | |
ctx.term() | |
class ZMQWriterDealer(multiprocessing.Process): | |
def __init__(self, count): | |
super(ZMQWriterDealer, self).__init__() | |
self.count = count | |
self.cc = 0 | |
def run(self): | |
ctx = zmq.Context() | |
pub = ctx.socket(zmq.DEALER) | |
#pub.setsockopt(zmq.SNDHWM, 1) | |
#pub.setsockopt(zmq.LINGER, 1) | |
pub.connect('ipc:///tmp/zmqtest') | |
time.sleep(0.1) | |
message = 'x'*512*1024 | |
for ii in xrange(0, self.count): | |
self.cc += 1 | |
pub.send(message, copy=False) | |
pub.close() | |
ctx.term() | |
# ZeroMQ ROUTER/ROUTER | |
class ZmqReaderRouter(multiprocessing.Process): | |
def __init__(self, count): | |
super(ZmqReaderRouter, self).__init__() | |
self.count = count | |
self.cc = 0 | |
def run(self): | |
ctx = zmq.Context() | |
sub = ctx.socket(zmq.ROUTER) | |
#sub.setsockopt(zmq.RCVHWM, 1) | |
#sub.setsockopt(zmq.LINGER, 1) | |
sub.setsockopt(zmq.IDENTITY, 'reader') | |
sub.bind('ipc:///tmp/zmqtest') | |
while self.cc < self.count: | |
self.cc += 1 | |
msg = sub.recv(copy=False) | |
sub.close() | |
ctx.term() | |
class ZMQWriterRouter(multiprocessing.Process): | |
def __init__(self, count): | |
super(ZMQWriterRouter, self).__init__() | |
self.count = count | |
self.cc = 0 | |
def run(self): | |
ctx = zmq.Context() | |
pub = ctx.socket(zmq.ROUTER) | |
#pub.setsockopt(zmq.SNDHWM, 1) | |
#pub.setsockopt(zmq.LINGER, 1) | |
pub.setsockopt(zmq.IDENTITY, 'writer') | |
pub.connect('ipc:///tmp/zmqtest') | |
time.sleep(0.1) | |
message = 'x'*512*1024 | |
for ii in xrange(0, self.count): | |
self.cc += 1 | |
pub.send_multipart(['reader', message], copy=False) | |
pub.close() | |
ctx.term() | |
# ZeroMQ PUSH/PULL | |
class ZmqReaderPull(multiprocessing.Process): | |
def __init__(self, count): | |
super(ZmqReaderPull, self).__init__() | |
self.count = count | |
self.cc = 0 | |
def run(self): | |
ctx = zmq.Context() | |
sub = ctx.socket(zmq.PULL) | |
#sub.setsockopt(zmq.RCVHWM, 1) | |
#sub.setsockopt(zmq.LINGER, 1) | |
sub.bind('ipc:///tmp/zmqtest') | |
while self.cc < self.count: | |
self.cc += 1 | |
msg = sub.recv(copy=False) | |
sub.close() | |
ctx.term() | |
class ZMQWriterPush(multiprocessing.Process): | |
def __init__(self, count): | |
super(ZMQWriterPush, self).__init__() | |
self.count = count | |
self.cc = 0 | |
def run(self): | |
ctx = zmq.Context() | |
pub = ctx.socket(zmq.PUSH) | |
#pub.setsockopt(zmq.SNDHWM, 1) | |
#pub.setsockopt(zmq.LINGER, 1) | |
pub.connect('ipc:///tmp/zmqtest') | |
time.sleep(0.1) | |
message = 'x'*512*1024 | |
for ii in xrange(0, self.count): | |
self.cc += 1 | |
pub.send(message, copy=False) | |
pub.close() | |
ctx.term() | |
if __name__=='__main__': | |
time.sleep(3) | |
log.info('- Running multiprocessing Pipe() benchmark:') | |
for count in [10**4, 10**5, 10**6]: | |
_start = time.time() | |
output_p, input_p = multiprocessing.Pipe() | |
reader_p = multiprocessing.Process(target=pipe_reader, args=((output_p, input_p),count,)) | |
reader_p.start() | |
output_p.close() | |
log.info('Sending %i requests' % count) | |
pipe_writer(count, input_p) | |
# Ask the reader to stop when it reads EOF | |
input_p.close() | |
reader_p.join() | |
log.info( | |
'Sending %s numbers to Pipe() took %s seconds' % ( | |
count, (time.time() - _start) | |
)) | |
time.sleep(3) | |
log.info('- Running ZMQ() DEALER/DEALER benchmark:') | |
for count in [10**4, 10**5, 10**6]: | |
_start = time.time() | |
reader_p = ZmqReaderDealer(count=count) | |
reader_p.start() | |
time.sleep(0.1) | |
log.info('Sending %i ZMQ requests' % count) | |
zmq_writer = ZMQWriterDealer(count=count) | |
zmq_writer.start() | |
reader_p.join() | |
log.info( | |
'Sending %s numbers via ZMQ() took %s seconds' % ( | |
count, (time.time() - _start) | |
)) | |
time.sleep(3) | |
log.info('- Running ZMQ() ROUTER/ROUTER benchmark:') | |
for count in [10**4, 10**5, 10**6]: | |
_start = time.time() | |
reader_p = ZmqReaderRouter(count=count) | |
reader_p.start() | |
time.sleep(0.1) | |
log.info('Sending %i ZMQ requests' % count) | |
zmq_writer = ZMQWriterRouter(count=count) | |
zmq_writer.start() | |
reader_p.join() | |
log.info( | |
'Sending %s numbers via ZMQ() took %s seconds' % ( | |
count, (time.time() - _start) | |
)) | |
time.sleep(3) | |
log.info('- Running ZMQ() PUSH/PULL benchmark:') | |
for count in [10**4, 10**5, 10**6]: | |
_start = time.time() | |
reader_p = ZmqReaderPull(count=count) | |
reader_p.start() | |
time.sleep(0.1) | |
log.info('Sending %i ZMQ requests' % count) | |
zmq_writer = ZMQWriterPush(count=count) | |
zmq_writer.start() | |
reader_p.join() | |
log.info( | |
'Sending %s numbers via ZMQ() took %s seconds' % ( | |
count, (time.time() - _start) | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment