Skip to content

Instantly share code, notes, and snippets.

@tmc
Created January 12, 2011 23:08

Revisions

  1. tmc renamed this gist Jan 12, 2011. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. tmc created this gist Jan 12, 2011.
    140 changes: 140 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,140 @@
    import sys
    import gevent
    from gevent.monkey import patch_all; patch_all()
    from gevent import server, event, socket
    from multiprocessing import Process, current_process, cpu_count

    """
    Simple multiprocess StreamServer that proxies messages between clients.

    Avoids using a multiprocessing.Event since it blocks on a semaphore.

    """

    def note(format, *args):
    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))

    class ChatHandler(object):

    def __init__(self, master_location):
    self._master_location = master_location
    self.green_event = event.Event()
    self.messages = []
    gevent.spawn_later(1, self.communicate_with_master)

    def add_message(self, author, message):
    note('%s: %s' % (author, message))
    self.master_socket.send('From %s %s > %s \r\n' % (author, current_process().name, message))

    def communicate_with_master(self):
    self.master_socket = socket.create_connection(self._master_location)
    f = self.master_socket.makefile()
    while True:
    m = f.readline()
    if not m:
    note('client disconnected')
    break

    self.messages.append(m)
    self.green_event.set()
    self.green_event.clear()

    def wait_for_green_events(self, fileobj):
    print 'wait for green'
    idx = len(self.messages)
    while True:
    self.green_event.wait()
    try:
    for message in self.messages[idx:]:
    fileobj.write('> '+ message)
    fileobj.flush()
    idx = len(self.messages)
    except:
    print 'error writing to a client'
    break


    def handle(self, fileobj, address):
    while True:
    note('reading from client')
    line = fileobj.readline()
    if not line:
    print "client disconnected"
    break
    if line.strip().lower() == 'quit':
    print "client quit"
    break

    self.add_message(address, line)

    fileobj.close()

    def __call__(self, socket, address):
    print 'New connection from %s:%s' % address

    fileobj = socket.makefile()
    fileobj.write('Welcome to the chat server! Type quit to exit.\r\n')
    fileobj.write('In %s\n\n' % current_process().name)
    fileobj.flush()

    gevent.spawn(self.wait_for_green_events, fileobj)
    gevent.spawn(self.handle, fileobj, address).join()


    class master(object):

    def __init__(self):
    self.children = []

    def __call__(self, socket, address):
    self.children.append((socket, address))
    gevent.spawn(self.listen, socket, address)

    def write_to_children(self, author, message):
    print 'writing message to children', message, len(self.children)
    for c in self.children:
    print 'sending to', c
    gevent.spawn(c[0].send, message)

    def listen(self, socket, address):
    f = socket.makefile()
    while True:
    line = f.readline()
    if not line:
    print 'client died'
    break
    self.write_to_children(address, line)


    master_listen = ('127.0.0.1', 4002)
    master = server.StreamServer(master_listen, master())

    Chat = ChatHandler(master_listen)

    s = server.StreamServer(('127.0.0.1', 4001), Chat)
    s.pre_start()

    def serve_forever(server, master_listen):
    Chat = ChatHandler(master_listen)
    server.set_handle(Chat)
    try:
    gevent.spawn_later(1, server.start_accepting)
    try:
    server._stopped_event.wait()
    except:
    raise
    except KeyboardInterrupt:
    pass

    number_of_processes = cpu_count() - 1
    number_of_processes = 2
    print 'Starting %s processes' % number_of_processes
    processes = [Process(target=serve_forever, args=(s, master_listen)) for i in range(number_of_processes)]


    for process in processes:
    process.start()
    s.start()
    master.start()

    gevent.joinall([master._stopped_event, s._stopped_event])