Created
January 12, 2011 23:08
Revisions
-
tmc renamed this gist
Jan 12, 2011 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
tmc created this gist
Jan 12, 2011 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,140 @@ import sys import gevent from gevent.monkey import patch_all; patch_all() from gevent import server, event, socket from multiprocessing import Process, current_process, cpu_count """ Simple multiprocess StreamServer that proxies messages between clients. Avoids using a multiprocessing.Event since it blocks on a semaphore. """ def note(format, *args): sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args)) class ChatHandler(object): def __init__(self, master_location): self._master_location = master_location self.green_event = event.Event() self.messages = [] gevent.spawn_later(1, self.communicate_with_master) def add_message(self, author, message): note('%s: %s' % (author, message)) self.master_socket.send('From %s %s > %s \r\n' % (author, current_process().name, message)) def communicate_with_master(self): self.master_socket = socket.create_connection(self._master_location) f = self.master_socket.makefile() while True: m = f.readline() if not m: note('client disconnected') break self.messages.append(m) self.green_event.set() self.green_event.clear() def wait_for_green_events(self, fileobj): print 'wait for green' idx = len(self.messages) while True: self.green_event.wait() try: for message in self.messages[idx:]: fileobj.write('> '+ message) fileobj.flush() idx = len(self.messages) except: print 'error writing to a client' break def handle(self, fileobj, address): while True: note('reading from client') line = fileobj.readline() if not line: print "client disconnected" break if line.strip().lower() == 'quit': print "client quit" break self.add_message(address, line) fileobj.close() def __call__(self, socket, address): print 'New connection from %s:%s' % address fileobj = socket.makefile() fileobj.write('Welcome to the chat server! Type quit to exit.\r\n') fileobj.write('In %s\n\n' % current_process().name) fileobj.flush() gevent.spawn(self.wait_for_green_events, fileobj) gevent.spawn(self.handle, fileobj, address).join() class master(object): def __init__(self): self.children = [] def __call__(self, socket, address): self.children.append((socket, address)) gevent.spawn(self.listen, socket, address) def write_to_children(self, author, message): print 'writing message to children', message, len(self.children) for c in self.children: print 'sending to', c gevent.spawn(c[0].send, message) def listen(self, socket, address): f = socket.makefile() while True: line = f.readline() if not line: print 'client died' break self.write_to_children(address, line) master_listen = ('127.0.0.1', 4002) master = server.StreamServer(master_listen, master()) Chat = ChatHandler(master_listen) s = server.StreamServer(('127.0.0.1', 4001), Chat) s.pre_start() def serve_forever(server, master_listen): Chat = ChatHandler(master_listen) server.set_handle(Chat) try: gevent.spawn_later(1, server.start_accepting) try: server._stopped_event.wait() except: raise except KeyboardInterrupt: pass number_of_processes = cpu_count() - 1 number_of_processes = 2 print 'Starting %s processes' % number_of_processes processes = [Process(target=serve_forever, args=(s, master_listen)) for i in range(number_of_processes)] for process in processes: process.start() s.start() master.start() gevent.joinall([master._stopped_event, s._stopped_event])