Skip to content

Instantly share code, notes, and snippets.

@a740122
Forked from tmc/gist:777085
Created December 4, 2015 07:45
Show Gist options
  • Save a740122/dffcfd83285e91822241 to your computer and use it in GitHub Desktop.
Save a740122/dffcfd83285e91822241 to your computer and use it in GitHub Desktop.
multiprocessing gevent chat example
import sys
import gevent
from gevent.monkey import patch_all; patch_all()
from gevent import server, event, socket
from multiprocessing import Process, current_process, cpu_count
"""
Simple multiprocess StreamServer that proxies messages between clients.
Avoids using a multiprocessing.Event since it blocks on a semaphore.
"""
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class ChatHandler(object):
def __init__(self, master_location):
self._master_location = master_location
self.green_event = event.Event()
self.messages = []
gevent.spawn_later(1, self.communicate_with_master)
def add_message(self, author, message):
note('%s: %s' % (author, message))
self.master_socket.send('From %s %s > %s \r\n' % (author, current_process().name, message))
def communicate_with_master(self):
self.master_socket = socket.create_connection(self._master_location)
f = self.master_socket.makefile()
while True:
m = f.readline()
if not m:
note('client disconnected')
break
self.messages.append(m)
self.green_event.set()
self.green_event.clear()
def wait_for_green_events(self, fileobj):
print 'wait for green'
idx = len(self.messages)
while True:
self.green_event.wait()
try:
for message in self.messages[idx:]:
fileobj.write('> '+ message)
fileobj.flush()
idx = len(self.messages)
except:
print 'error writing to a client'
break
def handle(self, fileobj, address):
while True:
note('reading from client')
line = fileobj.readline()
if not line:
print "client disconnected"
break
if line.strip().lower() == 'quit':
print "client quit"
break
self.add_message(address, line)
fileobj.close()
def __call__(self, socket, address):
print 'New connection from %s:%s' % address
fileobj = socket.makefile()
fileobj.write('Welcome to the chat server! Type quit to exit.\r\n')
fileobj.write('In %s\n\n' % current_process().name)
fileobj.flush()
gevent.spawn(self.wait_for_green_events, fileobj)
gevent.spawn(self.handle, fileobj, address).join()
class master(object):
def __init__(self):
self.children = []
def __call__(self, socket, address):
self.children.append((socket, address))
gevent.spawn(self.listen, socket, address)
def write_to_children(self, author, message):
print 'writing message to children', message, len(self.children)
for c in self.children:
print 'sending to', c
gevent.spawn(c[0].send, message)
def listen(self, socket, address):
f = socket.makefile()
while True:
line = f.readline()
if not line:
print 'client died'
break
self.write_to_children(address, line)
master_listen = ('127.0.0.1', 4002)
master = server.StreamServer(master_listen, master())
Chat = ChatHandler(master_listen)
s = server.StreamServer(('127.0.0.1', 4001), Chat)
s.pre_start()
def serve_forever(server, master_listen):
Chat = ChatHandler(master_listen)
server.set_handle(Chat)
try:
gevent.spawn_later(1, server.start_accepting)
try:
server._stopped_event.wait()
except:
raise
except KeyboardInterrupt:
pass
number_of_processes = cpu_count() - 1
number_of_processes = 2
print 'Starting %s processes' % number_of_processes
processes = [Process(target=serve_forever, args=(s, master_listen)) for i in range(number_of_processes)]
for process in processes:
process.start()
s.start()
master.start()
gevent.joinall([master._stopped_event, s._stopped_event])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment