Skip to content

Instantly share code, notes, and snippets.

@123Daoxyz
Created April 8, 2014 16:57
Show Gist options
  • Save 123Daoxyz/10155872 to your computer and use it in GitHub Desktop.
Save 123Daoxyz/10155872 to your computer and use it in GitHub Desktop.
udp server based on tornado
#!/usr/bin/env python
# encoding: utf-8
import time
import socket
from tornado.iostream import IOStream
from tornado.ioloop import IOLoop
from tornado import stack_context
import functools
import collections
class UDPRequest(object):
def __init__(self,addr,port,data):
self.addr = addr
self.port = port
self.data = data
def __getattribute__(self,name):
data = object.__getattribute__(self,name)
if name == 'data' and data.rfind('\r\n\r\n') != len(data)-4:
data += '\r\n\r\n'
return data
class _UDPConnection(object):
def __init__(self,io_loop,client,request,release_callback,
final_callback,max_buffer_size):
self.start_time = time.time()
self.io_loop = io_loop
self.client = client
self.request = request
self.release_callback = release_callback
self.final_callback = final_callback
addrinfo = socket.getaddrinfo(request.addr,request.port,
socket.AF_INET,socket.SOCK_DGRAM,0,0)
af,socktype,proto,canonname,sockaddr = addrinfo[0]
print af, socktype, proto, canonname, sockaddr
self.stream = IOStream(socket.socket(af,socktype,proto),
io_loop=self.io_loop,max_buffer_size=2500)
self.stream.connect(sockaddr,self._on_connect)
def _on_connect(self):
self.stream.write(self.request.data)
# FIXME 这里应该根据打开的udp端口,做一个tcp的listen来保证端口打开后不会消失?
# self.stream.read_until('\r\n\r\n',self._on_response)
def _on_response(self,data):
if self.release_callback is not None:
release_callback = self.release_callback
self.release_callback = None
release_callback()
if self.final_callback:
self.final_callback()
self.stream.close()
class AsyncUDPClient(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop or IOLoop.instance()
self.max_clients = 10
self.queue = collections.deque()
self.active = {}
self.max_buffer_size = 2500
def fetch(self,request,callback,**kwargs):
callback = stack_context.wrap(callback)
self.queue.append((request,callback))
self._process_queue()
def _process_queue(self):
with stack_context.NullContext():
while self.queue and len(self.active) < self.max_clients:
request, callback = self.queue.popleft()
key = object()
self.active[key] = (request,callback)
_UDPConnection(self.io_loop,self,request,
functools.partial(self._release_fetch,key),
callback,
self.max_buffer_size)
def _release_fetch(self,key):
del self.active[key]
self._process_queue()
def print_hello():
print("hello world")
http_client = AsyncUDPClient()
# while 1:
request = UDPRequest("127.0.0.1", 8888, "hehe\r\n\r\n")
response = http_client.fetch(request, print_hello)
IOLoop.instance().start()
# print response
#!/usr/bin/env python
# encoding: utf-8
"""A non-blocking, single-threaded TCP server."""
from __future__ import absolute_import, division, print_function, with_statement
import errno
import os
import socket
import ssl
from tornado.log import app_log
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, SSLIOStream
from tornado.netutil import ssl_wrap_socket
from tornado import process
from tornado.netutil import set_close_exec
class UDPServer(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop
self._sockets = {} # fd -> socket object
self._pending_sockets = []
self._started = False
def add_sockets(self, sockets):
if self.io_loop is None:
self.io_loop = IOLoop.instance()
for sock in sockets:
self._sockets[sock.fileno()] = sock
add_accept_handler(sock, self._on_recive,
io_loop=self.io_loop)
def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=25):
sockets = bind_sockets(port, address=address, family=family,
backlog=backlog)
if self._started:
self.add_sockets(sockets)
else:
self._pending_sockets.extend(sockets)
def start(self, num_processes=1):
assert not self._started
self._started = True
if num_processes != 1:
process.fork_processes(num_processes)
sockets = self._pending_sockets
self._pending_sockets = []
self.add_sockets(sockets)
def stop(self):
for fd, sock in self._sockets.iteritems():
self.io_loop.remove_handler(fd)
sock.close()
def _on_recive(self, data, address):
print(data)
host = address[0]
port = address[1]
sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.send("abcde\r\n\r\n")
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=25):
sockets = []
if address == "":
address = None
flags = socket.AI_PASSIVE
if hasattr(socket, "AI_ADDRCONFIG"):
flags |= socket.AI_ADDRCONFIG
for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_DGRAM,
0, flags)):
af, socktype, proto, canonname, sockaddr = res
sock = socket.socket(af, socktype, proto)
set_close_exec(sock.fileno())
if os.name != 'nt':
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if af == socket.AF_INET6:
if hasattr(socket, "IPPROTO_IPV6"):
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
sock.setblocking(0)
sock.bind(sockaddr)
sockets.append(sock)
return sockets
if hasattr(socket, 'AF_UNIX'):
def bind_unix_socket(file, mode=0600, backlog=128):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
set_close_exec(sock.fileno())
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
try:
st = os.stat(file)
except OSError, err:
if err.errno != errno.ENOENT:
raise
else:
if st.S_ISSOCK(st.st_mode):
os.remove(file)
else:
raise ValueError("File %s exists and is not a socket", file)
sock.bind(file)
os.chmod(file, mode)
sock.listen(backlog)
return sock
def add_accept_handler(sock, callback, io_loop=None):
if io_loop is None:
io_loop = IOLoop.instance()
def accept_handler(fd, events):
while True:
try:
data, address = sock.recvfrom(2500)
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
callback(data, address)
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
#!/usr/bin/env python
# encoding: utf-8
from tornado_udp import UDPServer
from tornado.ioloop import IOLoop
server = UDPServer()
server.bind(8888)
server.start(0)
IOLoop.instance().start()
@shajiquan
Copy link

bug在哪里?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment