Skip to content

Instantly share code, notes, and snippets.

@pipoket
Created November 25, 2011 07:40
Show Gist options
  • Save pipoket/1393004 to your computer and use it in GitHub Desktop.
Save pipoket/1393004 to your computer and use it in GitHub Desktop.
UDP server implementation for gevent. Copied from http://code.google.com/p/gevent/issues/detail?id=50
# Copyright (c) 2009-2010 Denis Bilenko. See LICENSE for details.
"""UDP/SSL server"""
import sys
import errno
import traceback
from gevent import socket
from gevent import core
from gevent.baseserver import BaseServer
__all__ = ['DgramServer']
class DgramServer(BaseServer):
"""A generic UDP server. Receive UDP package on a listening socket and spawns user-provided *handle*
for each connection with 2 arguments: the client message and the client address.
Note that although the errors in a successfully spawned handler will not affect the server or other connections,
the errors raised by :func:`accept` and *spawn* cause the server to stop accepting for a short amount of time. The
exact period depends on the values of :attr:`min_delay` and :attr:`max_delay` attributes.
The delay starts with :attr:`min_delay` and doubles with each successive error until it reaches :attr:`max_delay`.
A successful :func:`accept` resets the delay to :attr:`min_delay` again.
"""
# the number of seconds to sleep in case there was an error in recefrom() call
# for consecutive errors the delay will double until it reaches max_delay
# when accept() finally succeeds the delay will be reset to min_delay again
min_delay = 0.01
max_delay = 1
def __init__(self, listener, handle=None, backlog=None, spawn='default', **ssl_args):
BaseServer.__init__(self, listener, handle=handle, backlog=backlog, spawn=spawn)
self.delay = self.min_delay
self._recv_event = None
self._start_receving_timer = None
def set_listener(self, listener, backlog=None):
BaseServer.set_listener(self, listener, backlog=backlog)
try:
self.socket = self.socket._sock
except AttributeError:
pass
def set_spawn(self, spawn):
BaseServer.set_spawn(self, spawn)
if self.pool is not None:
self.pool._semaphore.rawlink(self._start_receiving)
def set_handle(self, handle):
BaseServer.set_handle(self, handle)
self._handle = self.handle
@property
def isStarted(self):
return self._recv_event is not None or self._start_receving_timer is not None
def start_accepting(self):
if self._recv_event is None:
self._recv_event = core.read_event(self.socket.fileno(), self._do_recv, persist=True)
def _start_receiving(self, _event):
if self._recv_event is None:
if 'socket' not in self.__dict__:
return
self._recv_event = core.read_event(self.socket.fileno(), self._do_recv, persist=True)
def stop_accepting(self):
if self._recv_event is not None:
self._recv_event.cancel()
self._recv_event = None
if self._start_receving_timer is not None:
self._start_receving_timer.cancel()
self._start_receving_timer = None
def _do_recv(self, event, _evtype):
assert event is self._recv_event
address = None
try:
if self.full():
self.stop_accepting()
return
try:
msg, address = self.socket.recvfrom(1024)
except socket.error, err:
if err[0]==errno.EAGAIN:
sys.exc_clear()
return
raise
self.delay = self.min_delay
spawn = self._spawn
if spawn is None:
self._handle(msg, address)
else:
spawn(self._handle, msg, address)
return
except:
traceback.print_exc()
ex = sys.exc_info()[1]
if self.is_fatal_error(ex):
self.kill()
sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
return
try:
if address is None:
sys.stderr.write('%s: Failed.\n' % (self, ))
else:
sys.stderr.write('%s: Failed to handle request from %s\n' % (self, address, ))
except Exception:
traceback.print_exc()
if self.delay >= 0:
self.stop_accepting()
self._start_receving_timer = core.timer(self.delay, self.start_accepting)
self.delay = min(self.max_delay, self.delay*2)
sys.exc_clear()
def is_fatal_error(self, ex):
return isinstance(ex, socket.error) and ex[0] in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK)
def _import_sslold_wrap_socket():
try:
from gevent.sslold import wrap_socket
return wrap_socket
except ImportError:
pass
import gevent
from gevent import socket
from dgram_server import DgramServer
# this handler will be run for each incoming connection in a dedicated greenlet
def echo(msg, address):
print 'New message from %s:%s' % (msg, address)
if __name__ == '__main__':
# to make the server use SSL, pass certfile and keyfile arguments to the constructor
udp_sock = gevent.socket.socket(gevent.socket.AF_INET, gevent.socket.SOCK_DGRAM)
udp_sock.setsockopt(gevent.socket.SOL_SOCKET, gevent.socket.SO_BROADCAST, 1)
udp_sock.bind(('192.168.1.228', 6000))
server = DgramServer(udp_sock, echo)
# to start the server asynchronously, use its start() method;
# we use blocking serve_forever() here because we have no other jobs
print 'Starting echo server on port 6000'
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment