Created
April 8, 2014 16:57
-
-
Save 123Daoxyz/10155872 to your computer and use it in GitHub Desktop.
udp server based on tornado
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
import time | |
import socket | |
from tornado.iostream import IOStream | |
from tornado.ioloop import IOLoop | |
from tornado import stack_context | |
import functools | |
import collections | |
class UDPRequest(object): | |
def __init__(self,addr,port,data): | |
self.addr = addr | |
self.port = port | |
self.data = data | |
def __getattribute__(self,name): | |
data = object.__getattribute__(self,name) | |
if name == 'data' and data.rfind('\r\n\r\n') != len(data)-4: | |
data += '\r\n\r\n' | |
return data | |
class _UDPConnection(object): | |
def __init__(self,io_loop,client,request,release_callback, | |
final_callback,max_buffer_size): | |
self.start_time = time.time() | |
self.io_loop = io_loop | |
self.client = client | |
self.request = request | |
self.release_callback = release_callback | |
self.final_callback = final_callback | |
addrinfo = socket.getaddrinfo(request.addr,request.port, | |
socket.AF_INET,socket.SOCK_DGRAM,0,0) | |
af,socktype,proto,canonname,sockaddr = addrinfo[0] | |
print af, socktype, proto, canonname, sockaddr | |
self.stream = IOStream(socket.socket(af,socktype,proto), | |
io_loop=self.io_loop,max_buffer_size=2500) | |
self.stream.connect(sockaddr,self._on_connect) | |
def _on_connect(self): | |
self.stream.write(self.request.data) | |
# FIXME 这里应该根据打开的udp端口,做一个tcp的listen来保证端口打开后不会消失? | |
# self.stream.read_until('\r\n\r\n',self._on_response) | |
def _on_response(self,data): | |
if self.release_callback is not None: | |
release_callback = self.release_callback | |
self.release_callback = None | |
release_callback() | |
if self.final_callback: | |
self.final_callback() | |
self.stream.close() | |
class AsyncUDPClient(object): | |
def __init__(self, io_loop=None): | |
self.io_loop = io_loop or IOLoop.instance() | |
self.max_clients = 10 | |
self.queue = collections.deque() | |
self.active = {} | |
self.max_buffer_size = 2500 | |
def fetch(self,request,callback,**kwargs): | |
callback = stack_context.wrap(callback) | |
self.queue.append((request,callback)) | |
self._process_queue() | |
def _process_queue(self): | |
with stack_context.NullContext(): | |
while self.queue and len(self.active) < self.max_clients: | |
request, callback = self.queue.popleft() | |
key = object() | |
self.active[key] = (request,callback) | |
_UDPConnection(self.io_loop,self,request, | |
functools.partial(self._release_fetch,key), | |
callback, | |
self.max_buffer_size) | |
def _release_fetch(self,key): | |
del self.active[key] | |
self._process_queue() | |
def print_hello(): | |
print("hello world") | |
http_client = AsyncUDPClient() | |
# while 1: | |
request = UDPRequest("127.0.0.1", 8888, "hehe\r\n\r\n") | |
response = http_client.fetch(request, print_hello) | |
IOLoop.instance().start() | |
# print response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
"""A non-blocking, single-threaded TCP server.""" | |
from __future__ import absolute_import, division, print_function, with_statement | |
import errno | |
import os | |
import socket | |
import ssl | |
from tornado.log import app_log | |
from tornado.ioloop import IOLoop | |
from tornado.iostream import IOStream, SSLIOStream | |
from tornado.netutil import ssl_wrap_socket | |
from tornado import process | |
from tornado.netutil import set_close_exec | |
class UDPServer(object): | |
def __init__(self, io_loop=None): | |
self.io_loop = io_loop | |
self._sockets = {} # fd -> socket object | |
self._pending_sockets = [] | |
self._started = False | |
def add_sockets(self, sockets): | |
if self.io_loop is None: | |
self.io_loop = IOLoop.instance() | |
for sock in sockets: | |
self._sockets[sock.fileno()] = sock | |
add_accept_handler(sock, self._on_recive, | |
io_loop=self.io_loop) | |
def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=25): | |
sockets = bind_sockets(port, address=address, family=family, | |
backlog=backlog) | |
if self._started: | |
self.add_sockets(sockets) | |
else: | |
self._pending_sockets.extend(sockets) | |
def start(self, num_processes=1): | |
assert not self._started | |
self._started = True | |
if num_processes != 1: | |
process.fork_processes(num_processes) | |
sockets = self._pending_sockets | |
self._pending_sockets = [] | |
self.add_sockets(sockets) | |
def stop(self): | |
for fd, sock in self._sockets.iteritems(): | |
self.io_loop.remove_handler(fd) | |
sock.close() | |
def _on_recive(self, data, address): | |
print(data) | |
host = address[0] | |
port = address[1] | |
sock = socket.socket( | |
socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect((host, port)) | |
sock.send("abcde\r\n\r\n") | |
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=25): | |
sockets = [] | |
if address == "": | |
address = None | |
flags = socket.AI_PASSIVE | |
if hasattr(socket, "AI_ADDRCONFIG"): | |
flags |= socket.AI_ADDRCONFIG | |
for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_DGRAM, | |
0, flags)): | |
af, socktype, proto, canonname, sockaddr = res | |
sock = socket.socket(af, socktype, proto) | |
set_close_exec(sock.fileno()) | |
if os.name != 'nt': | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
if af == socket.AF_INET6: | |
if hasattr(socket, "IPPROTO_IPV6"): | |
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) | |
sock.setblocking(0) | |
sock.bind(sockaddr) | |
sockets.append(sock) | |
return sockets | |
if hasattr(socket, 'AF_UNIX'): | |
def bind_unix_socket(file, mode=0600, backlog=128): | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) | |
set_close_exec(sock.fileno()) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setblocking(0) | |
try: | |
st = os.stat(file) | |
except OSError, err: | |
if err.errno != errno.ENOENT: | |
raise | |
else: | |
if st.S_ISSOCK(st.st_mode): | |
os.remove(file) | |
else: | |
raise ValueError("File %s exists and is not a socket", file) | |
sock.bind(file) | |
os.chmod(file, mode) | |
sock.listen(backlog) | |
return sock | |
def add_accept_handler(sock, callback, io_loop=None): | |
if io_loop is None: | |
io_loop = IOLoop.instance() | |
def accept_handler(fd, events): | |
while True: | |
try: | |
data, address = sock.recvfrom(2500) | |
except socket.error, e: | |
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): | |
return | |
raise | |
callback(data, address) | |
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
from tornado_udp import UDPServer | |
from tornado.ioloop import IOLoop | |
server = UDPServer() | |
server.bind(8888) | |
server.start(0) | |
IOLoop.instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
bug在哪里?