Last active
July 27, 2016 02:50
-
-
Save mydreambei-ai/897d53adf6c643a4fbe77756cc1a20b7 to your computer and use it in GitHub Desktop.
pipe vs socketpair
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
class Consumer(threading.Thread): | |
def __init__(self, sock): | |
super(Consumer, self).__init__() | |
self.sock = sock | |
def run(self): | |
while True: | |
try: | |
data = self.sock.recv() | |
except EOFError: | |
break | |
self.sock.close() | |
class Producer(threading.Thread): | |
def __init__(self, sock): | |
super(Producer, self).__init__() | |
self.sock = sock | |
def run(self): | |
count = 100000 | |
while count > 0: | |
self.sock.send({"name":"1"}) | |
count -= 1 | |
self.sock.close() | |
if __name__ == '__main__': | |
from multiprocessing.connection import Connection | |
import os | |
r, w = os.pipe() | |
consumer = Consumer(Connection(r)) | |
producer = Producer(Connection(w)) | |
consumer.start() | |
producer.start() | |
consumer.join() | |
producer.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import msgpack | |
EOF = b'Bye' | |
class Message(object): | |
def __init__(self, data): | |
self.data = msgpack.packb(data) + EOF | |
@staticmethod | |
def upackb(data, **kwargs): | |
return msgpack.unpackb(data[:len(data) - len(EOF)], **kwargs) | |
class Consumer(threading.Thread): | |
def __init__(self, sock): | |
super(Consumer, self).__init__() | |
self.sock = sock | |
def run(self): | |
item = b'' | |
while True: | |
data = self.sock.recv(10) | |
if not data: | |
break | |
else: | |
item += data | |
try: | |
index = item.index(EOF) + len(EOF) | |
except ValueError: | |
continue | |
else: | |
data = Message.upackb(item[:index]) | |
item = item[index:] | |
print(data) | |
self.sock.close() | |
class Producer(threading.Thread): | |
def __init__(self, sock): | |
super(Producer, self).__init__() | |
self.sock = sock | |
def run(self): | |
count = 100000 | |
while count > 0: | |
self.sock.send(Message({"name": "lmy"}).data) | |
# time.sleep(0.5) | |
count -= 1 | |
self.sock.close() | |
if __name__ == '__main__': | |
import socket | |
r, w = socket.socketpair() | |
consumer = Consumer(r) | |
producer = Producer(w) | |
consumer.start() | |
producer.start() | |
consumer.join() | |
producer.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from multiprocessing import Process | |
import msgpack | |
EOF = b'Bye' | |
''' | |
issues | |
1. Producer must use socket.shutdown method if using close will blocking | |
''' | |
class Message(object): | |
def __init__(self, data): | |
self.data = msgpack.packb(data) + EOF | |
@staticmethod | |
def upackb(data, **kwargs): | |
return msgpack.unpackb(data[:len(data) - len(EOF)], **kwargs) | |
class Consumer(Process): | |
def __init__(self, sock): | |
super(Consumer, self).__init__() | |
self.sock = sock | |
def run(self): | |
item = b'' | |
while True: | |
data = self.sock.recv(1024) | |
if not data: | |
break | |
else: | |
item += data | |
item = self.read_data(item) | |
self.sock.close() | |
def read_data(self, item): | |
try: | |
index = item.index(EOF) + len(EOF) | |
except ValueError: | |
return item | |
else: | |
data = Message.upackb(item[:index]) | |
self.do_data(data) | |
return self.read_data(item[index:]) | |
def do_data(self, data): | |
print(data) | |
class Producer(Process): | |
def __init__(self, sock): | |
super(Producer, self).__init__() | |
self.sock = sock | |
def run(self): | |
count = 4 | |
while count > 0: | |
# time.sleep(0.1) | |
self.sock.send(Message({"name": "lmy"}).data) | |
count -= 1 | |
self.sock.shutdown(1) | |
if __name__ == '__main__': | |
import socket | |
r, w = socket.socketpair() | |
consumer = Consumer(r) | |
producer = Producer(w) | |
consumer.start() | |
producer.start() | |
producer.join() | |
consumer.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment