Skip to content

Instantly share code, notes, and snippets.

@romanvm
Last active March 18, 2018 14:12
Show Gist options
  • Save romanvm/e3c5124f0d5a7a7ee3f1db88126ecb93 to your computer and use it in GitHub Desktop.
Save romanvm/e3c5124f0d5a7a7ee3f1db88126ecb93 to your computer and use it in GitHub Desktop.
Single-threaded asynchronous WSGI server based on asyncore module
# coding: utf-8
# Author: Roman Miroshnychenko aka Roman V.M.
# E-mail: [email protected]
# License: MIT https://opensource.org/licenses/MIT
"""
Single-threaded asynchronous WSGI server
Example::
from from wsgiref.simple_server import demo_app
from asyncore_wsgi import make_server
httpd = make_server('', 8000, demo_app)
httpd.serve_forever()
The server in the preceding example serves a demo WSGI app from
the Standard Library.
"""
import asyncore
import logging
import select
import socket
from io import BytesIO
from shutil import copyfileobj
from tempfile import TemporaryFile
from wsgiref.simple_server import WSGIServer, ServerHandler, WSGIRequestHandler
__all__ = ['AsyncWsgiHandler', 'AsyncWsgiServer', 'make_server']
__version__ = '0.0.3'
logging.basicConfig(
format='%(asctime)s: %(module)s:%(lineno)d - %(levelname)s - %(message)s',
level=logging.DEBUG
)
def get_poll_func():
"""Get the best available socket poll function
:return: poller function
"""
if hasattr(select, 'poll'):
poll_func = asyncore.poll2
else:
poll_func = asyncore.poll
return poll_func
class AsyncWsgiHandler(asyncore.dispatcher, WSGIRequestHandler):
"""
Asynchronous WSGI request handler with optional WebSocket support
"""
accepting = False
server_version = 'AsyncWsgiServer/' + __version__
protocol_version = 'HTTP/1.1'
max_input_content_length = 1024 * 1024 * 1024
verbose_logging = False
def __init__(self, request, client_address, server, map):
self._can_read = True
self._can_write = False
self.request = request
self.client_address = client_address
self.server = server
self.setup()
asyncore.dispatcher.__init__(self, request, map)
def log_message(self, format, *args):
if self.verbose_logging:
WSGIRequestHandler.log_message(self, format, *args)
def readable(self):
return self._can_read
def writable(self):
return self._can_write
def handle_read(self):
self._can_read = False
try:
self.raw_requestline = self.rfile.readline(65537)
except Exception:
self.handle_error()
return
if len(self.raw_requestline) > 65536:
self.requestline = ''
self.request_version = ''
self.command = ''
self.send_error(414)
self.handle_close()
return
if not self.parse_request():
self.handle_close()
return
self._input_stream = BytesIO()
if self.command.lower() in ('post', 'put', 'patch'):
cont_length = self.headers.get('content-length')
if cont_length is None:
self.send_error(411)
self.handle_close()
return
else:
cont_length = int(cont_length)
if cont_length > self.max_input_content_length:
self.send_error(413)
self.handle_close()
return
elif cont_length > 16 * 1024:
self._input_stream = TemporaryFile()
copyfileobj(self.rfile, self._input_stream)
self._input_stream.seek(0)
self._can_write = True
def handle_write(self):
self._can_write = False
handler = ServerHandler(self._input_stream, self.wfile,
self.get_stderr(), self.get_environ())
handler.server_software = self.server_version
handler.http_version = self.protocol_version[5:]
handler.request_handler = self # backpointer for logging
handler.wsgi_multiprocess = False
handler.wsgi_multithread = False
try:
handler.run(self.server.get_app())
except Exception:
self.handle_error()
return
if self.close_connection:
self.handle_close()
else:
try:
self.wfile.flush()
except socket.error:
self.handle_error()
else:
self._can_read = True
def handle_error(self):
logging.exception('Exception in {}!'.format(repr(self)))
self.handle_close()
def close(self):
WSGIRequestHandler.finish(self)
asyncore.dispatcher.close(self)
class AsyncWsgiServer(asyncore.dispatcher, WSGIServer):
"""Asynchronous WSGI server"""
def __init__(self, server_address,
RequestHandlerClass=AsyncWsgiHandler,
map=None):
if map is None:
map = {}
asyncore.dispatcher.__init__(self, map=map)
WSGIServer.__init__(self, server_address, RequestHandlerClass, False)
self._poll_func = get_poll_func()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(server_address)
self.listen(5)
self.server_address = self.socket.getsockname()
host, port = self.server_address[:2]
self.server_name = socket.getfqdn(host)
self.server_port = port
self.setup_environ()
def writable(self):
return False
def handle_accept(self):
try:
pair = self.accept()
except socket.error:
logger.exception('Exception when accepting a request!')
else:
if pair is not None:
self.RequestHandlerClass(pair[0], pair[1], self, self._map)
def handle_error(self, *args, **kwargs):
logging.exception('Exception in {}!'.format(repr(self)))
self.handle_close()
def poll_once(self, timeout=0.0):
"""
Poll active sockets once
This method can be used to allow aborting server polling loop
on some condition.
:param timeout: polling timeout
"""
if self._map:
self._poll_func(timeout, self._map)
def handle_request(self):
"""Call :meth:`poll_once`"""
self.poll_once(0.5)
def serve_forever(self, poll_interval=0.5):
"""
Start serving HTTP requests
This method blocks the current thread.
:param poll_interval: polling timeout
:return:
"""
logging.info('Starting server on {}:{}...'.format(
self.server_name, self.server_port)
)
while True:
try:
self.poll_once(poll_interval)
except (KeyboardInterrupt, SystemExit):
break
self.handle_close()
logging.info('Server stopped.')
def close(self):
asyncore.dispatcher.close(self)
asyncore.close_all(self._map, True)
def make_server(host, port, app,
server_class=AsyncWsgiServer,
handler_class=AsyncWsgiHandler):
"""
Create AsyncWsgiServer instance
:param host: hostname or IP
:type host: str
:param port: server port
:type port: int
:param app: WSGI application
:param server_class: WSGI server class, defaults to AsyncWsgiServer
:param handler_class: WSGI handler class, defaults to AsyncWsgiHandler
:return: initialized server instance
"""
httpd = server_class((host, port), RequestHandlerClass=handler_class)
httpd.set_app(app)
return httpd
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment