Skip to content

Instantly share code, notes, and snippets.

@mydreambei-ai
Last active July 27, 2016 02:50
Show Gist options
  • Save mydreambei-ai/897d53adf6c643a4fbe77756cc1a20b7 to your computer and use it in GitHub Desktop.
Save mydreambei-ai/897d53adf6c643a4fbe77756cc1a20b7 to your computer and use it in GitHub Desktop.
pipe vs socketpair
import threading
class Consumer(threading.Thread):
def __init__(self, sock):
super(Consumer, self).__init__()
self.sock = sock
def run(self):
while True:
try:
data = self.sock.recv()
except EOFError:
break
self.sock.close()
class Producer(threading.Thread):
def __init__(self, sock):
super(Producer, self).__init__()
self.sock = sock
def run(self):
count = 100000
while count > 0:
self.sock.send({"name":"1"})
count -= 1
self.sock.close()
if __name__ == '__main__':
from multiprocessing.connection import Connection
import os
r, w = os.pipe()
consumer = Consumer(Connection(r))
producer = Producer(Connection(w))
consumer.start()
producer.start()
consumer.join()
producer.join()
import threading
import msgpack
EOF = b'Bye'
class Message(object):
def __init__(self, data):
self.data = msgpack.packb(data) + EOF
@staticmethod
def upackb(data, **kwargs):
return msgpack.unpackb(data[:len(data) - len(EOF)], **kwargs)
class Consumer(threading.Thread):
def __init__(self, sock):
super(Consumer, self).__init__()
self.sock = sock
def run(self):
item = b''
while True:
data = self.sock.recv(10)
if not data:
break
else:
item += data
try:
index = item.index(EOF) + len(EOF)
except ValueError:
continue
else:
data = Message.upackb(item[:index])
item = item[index:]
print(data)
self.sock.close()
class Producer(threading.Thread):
def __init__(self, sock):
super(Producer, self).__init__()
self.sock = sock
def run(self):
count = 100000
while count > 0:
self.sock.send(Message({"name": "lmy"}).data)
# time.sleep(0.5)
count -= 1
self.sock.close()
if __name__ == '__main__':
import socket
r, w = socket.socketpair()
consumer = Consumer(r)
producer = Producer(w)
consumer.start()
producer.start()
consumer.join()
producer.join()
import time
from multiprocessing import Process
import msgpack
EOF = b'Bye'
'''
issues
1. Producer must use socket.shutdown method if using close will blocking
'''
class Message(object):
def __init__(self, data):
self.data = msgpack.packb(data) + EOF
@staticmethod
def upackb(data, **kwargs):
return msgpack.unpackb(data[:len(data) - len(EOF)], **kwargs)
class Consumer(Process):
def __init__(self, sock):
super(Consumer, self).__init__()
self.sock = sock
def run(self):
item = b''
while True:
data = self.sock.recv(1024)
if not data:
break
else:
item += data
item = self.read_data(item)
self.sock.close()
def read_data(self, item):
try:
index = item.index(EOF) + len(EOF)
except ValueError:
return item
else:
data = Message.upackb(item[:index])
self.do_data(data)
return self.read_data(item[index:])
def do_data(self, data):
print(data)
class Producer(Process):
def __init__(self, sock):
super(Producer, self).__init__()
self.sock = sock
def run(self):
count = 4
while count > 0:
# time.sleep(0.1)
self.sock.send(Message({"name": "lmy"}).data)
count -= 1
self.sock.shutdown(1)
if __name__ == '__main__':
import socket
r, w = socket.socketpair()
consumer = Consumer(r)
producer = Producer(w)
consumer.start()
producer.start()
producer.join()
consumer.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment