Last active
December 29, 2015 20:59
-
-
Save telendt/7727452 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Simple static server written using an event loop.""" | |
import argparse | |
import contextlib | |
import errno | |
import functools | |
import logging | |
import mimetypes | |
import os | |
import stat | |
import sys | |
try: | |
import ssl | |
except ImportError: # pragma: no cover | |
ssl = None | |
assert sys.version >= '3.3', 'Please use Python 3.3 or higher.' | |
import asyncio | |
import aiohttp | |
import aiohttp.server | |
from urllib.parse import unquote | |
from wsgiref.handlers import format_date_time | |
@contextlib.contextmanager | |
def fd_closing(fd): | |
try: | |
yield fd | |
finally: | |
os.close(fd) | |
def wrap_errors(c): | |
@asyncio.coroutine | |
@functools.wraps(c) | |
def wrapper(*args, **kwargs): | |
try: | |
yield from c(*args, **kwargs) | |
except FileNotFoundError as e: | |
raise aiohttp.HttpErrorException(404) from e | |
except PermissionError as e: | |
raise aiohttp.HttpErrorException(403) from e | |
return wrapper | |
class StaticHttpServer(aiohttp.server.ServerHttpProtocol): | |
def wait_for_write(self, fd): | |
future = asyncio.futures.Future(loop=self._loop) | |
def writer(): | |
try: | |
self._loop.remove_writer(fd) | |
except OSError as e: | |
if e.errno != errno.EBADF: | |
raise | |
else: | |
future.set_result(None) | |
self._loop.add_writer(fd, writer) | |
return future | |
def fs_exec(self, callback, *args): | |
return self._loop.run_in_executor(None, callback, *args) | |
@asyncio.coroutine | |
@wrap_errors | |
def handle_request(self, message, payload): | |
path = unquote(message.path) | |
if not (path.isprintable() and path.startswith('/')) or '/.' in path: | |
raise aiohttp.HttpErrorException(404) | |
response = aiohttp.Response(self.transport, | |
status=200, | |
http_version=message.version) | |
with fd_closing((yield from self.fs_exec( | |
os.open, '.' + path, os.O_RDONLY))) as path_fd: | |
path_st = yield from self.fs_exec(os.fstat, path_fd) | |
if stat.S_ISDIR(path_st.st_mode): | |
if not path.endswith('/'): | |
path = path + '/' | |
raise aiohttp.HttpErrorException( | |
302, headers=(('URI', path), ('Location', path))) | |
response.add_header('Transfer-Encoding', 'chunked') | |
response.add_header('Content-type', 'text/html') | |
response.send_headers() | |
response.write(b'<ul>\n') | |
for name in (yield from self.fs_exec(os.listdir, '.' + path)): | |
if name.startswith('.'): | |
continue | |
is_dir = yield from self.fs_exec(os.path.isdir, | |
os.path.join('.' + path, name)) | |
response.write( | |
'<li><a href="{0}{1}">{0}</a></li>\n'.format( | |
name, '/' if is_dir else '').encode( | |
'ascii', 'xmlcharrefreplace')) | |
response.write(b'</ul>\r\n') | |
else: | |
mime_type = mimetypes.guess_type(path) | |
if mime_type[0]: | |
response.add_header('Content-Type', mime_type[0]) | |
response.add_header('Content-Length', str(path_st.st_size)) | |
response.add_header('Last-Modified', | |
format_date_time(path_st.st_mtime)) | |
response.send_headers() | |
# TODO: make sure headers are sent at this point | |
out_fd = response.transport._sock_fd | |
offset = 0 | |
nbytes = path_st.st_size | |
while nbytes > 0: | |
try: | |
sent = yield from self.fs_exec( | |
os.sendfile, out_fd, path_fd, offset, nbytes) | |
except BlockingIOError: | |
yield from self.wait_for_write(out_fd) | |
except InterruptedError: | |
continue | |
else: | |
offset += sent | |
nbytes -= sent | |
response.write_eof() | |
ARGS = argparse.ArgumentParser(description="Run simple http server.") | |
ARGS.add_argument( | |
'--host', action="store", dest='host', | |
default='0.0.0.0', help='Host name') | |
ARGS.add_argument( | |
'--port', action="store", dest='port', | |
default=8080, type=int, help='Port number') | |
ARGS.add_argument( | |
'--iocp', action="store_true", dest='iocp', help='Windows IOCP event loop') | |
ARGS.add_argument( | |
'--ssl', action="store_true", dest='ssl', help='Run ssl mode.') | |
ARGS.add_argument( | |
'--sslcert', action="store", dest='certfile', help='SSL cert file.') | |
ARGS.add_argument( | |
'--sslkey', action="store", dest='keyfile', help='SSL key file.') | |
def main(): | |
args = ARGS.parse_args() | |
if ':' in args.host: | |
args.host, port = args.host.split(':', 1) | |
args.port = int(port) | |
if args.iocp: | |
from asyncio import windows_events | |
sys.argv.remove('--iocp') | |
logging.info('using iocp') | |
el = windows_events.ProactorEventLoop() | |
asyncio.set_event_loop(el) | |
if args.ssl: | |
here = os.path.join(os.path.dirname(__file__), 'tests') | |
if args.certfile: | |
certfile = args.certfile or os.path.join(here, 'sample.crt') | |
keyfile = args.keyfile or os.path.join(here, 'sample.key') | |
else: | |
certfile = os.path.join(here, 'sample.crt') | |
keyfile = os.path.join(here, 'sample.key') | |
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) | |
sslcontext.load_cert_chain(certfile, keyfile) | |
else: | |
sslcontext = None | |
loop = asyncio.get_event_loop() | |
f = loop.create_server( | |
lambda: StaticHttpServer(debug=True, keep_alive=10), | |
args.host, args.port, ssl=sslcontext) | |
svr = loop.run_until_complete(f) | |
socks = svr.sockets | |
print('serving on', socks[0].getsockname()) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment