Last active
January 15, 2022 21:14
-
-
Save mivade/782d2bf5ab01b649b138e4a1e8932f1d to your computer and use it in GitHub Desktop.
Interprocess communication speed comparisons
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Event, Process, Queue | |
import time | |
import zmq | |
class BaseActor(Process): | |
ready = Event() | |
def handle(self, msg): | |
print("dt =", time.time() - msg['timestamp']) | |
def main(n_bytes: int): | |
raise NotImplementedError | |
class QActor(BaseActor): | |
def __init__(self, queue: Queue, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.queue = queue | |
def run(self): | |
self.ready.set() | |
while True: | |
msg = self.queue.get() | |
if msg['data'] is None: | |
break | |
self.handle(msg) | |
@staticmethod | |
def main(n_bytes: int): | |
queue = Queue() | |
def put(data): | |
queue.put({ | |
'timestamp': time.time(), | |
'data': data, | |
}) | |
actor = QActor(queue) | |
actor.start() | |
actor.ready.wait() | |
payload = b'*' * n_bytes | |
put(payload) | |
put(None) | |
actor.join() | |
class ZActor(BaseActor): | |
def __init__(self, address, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.address = address | |
def run(self): | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.PULL) | |
sock.connect(self.address) | |
self.ready.set() | |
while True: | |
msg = sock.recv_pyobj() | |
if msg['data'] is None: | |
break | |
self.handle(msg) | |
@staticmethod | |
def main(n_bytes: int): | |
address = 'tcp://127.0.0.1:8778' | |
# address = 'ipc://endpoint' | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.PUSH) | |
sock.bind(address) | |
def send(data): | |
sock.send_pyobj({ | |
'timestamp': time.time(), | |
'data': data | |
}) | |
actor = ZActor(address) | |
actor.start() | |
actor.ready.wait() | |
payload = b'*' * n_bytes | |
send(payload) | |
send(None) | |
actor.join() | |
if __name__ == "__main__": | |
from argparse import ArgumentParser | |
parser = ArgumentParser() | |
parser.add_argument('n_bytes', type=int, help='number of bytes to send') | |
args = parser.parse_args() | |
print("IPC via queue") | |
QActor.main(args.n_bytes) | |
print("IPC via ZMQ sockets") | |
ZActor.main(args.n_bytes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment