Skip to content

Instantly share code, notes, and snippets.

@telendt
Last active December 29, 2015 20:59
Show Gist options
  • Save telendt/7727452 to your computer and use it in GitHub Desktop.
Save telendt/7727452 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""Simple static server written using an event loop."""
import argparse
import contextlib
import errno
import functools
import logging
import mimetypes
import os
import stat
import sys
try:
import ssl
except ImportError: # pragma: no cover
ssl = None
assert sys.version >= '3.3', 'Please use Python 3.3 or higher.'
import asyncio
import aiohttp
import aiohttp.server
from urllib.parse import unquote
from wsgiref.handlers import format_date_time
@contextlib.contextmanager
def fd_closing(fd):
try:
yield fd
finally:
os.close(fd)
def wrap_errors(c):
@asyncio.coroutine
@functools.wraps(c)
def wrapper(*args, **kwargs):
try:
yield from c(*args, **kwargs)
except FileNotFoundError as e:
raise aiohttp.HttpErrorException(404) from e
except PermissionError as e:
raise aiohttp.HttpErrorException(403) from e
return wrapper
class StaticHttpServer(aiohttp.server.ServerHttpProtocol):
def wait_for_write(self, fd):
future = asyncio.futures.Future(loop=self._loop)
def writer():
try:
self._loop.remove_writer(fd)
except OSError as e:
if e.errno != errno.EBADF:
raise
else:
future.set_result(None)
self._loop.add_writer(fd, writer)
return future
def fs_exec(self, callback, *args):
return self._loop.run_in_executor(None, callback, *args)
@asyncio.coroutine
@wrap_errors
def handle_request(self, message, payload):
path = unquote(message.path)
if not (path.isprintable() and path.startswith('/')) or '/.' in path:
raise aiohttp.HttpErrorException(404)
response = aiohttp.Response(self.transport,
status=200,
http_version=message.version)
with fd_closing((yield from self.fs_exec(
os.open, '.' + path, os.O_RDONLY))) as path_fd:
path_st = yield from self.fs_exec(os.fstat, path_fd)
if stat.S_ISDIR(path_st.st_mode):
if not path.endswith('/'):
path = path + '/'
raise aiohttp.HttpErrorException(
302, headers=(('URI', path), ('Location', path)))
response.add_header('Transfer-Encoding', 'chunked')
response.add_header('Content-type', 'text/html')
response.send_headers()
response.write(b'<ul>\n')
for name in (yield from self.fs_exec(os.listdir, '.' + path)):
if name.startswith('.'):
continue
is_dir = yield from self.fs_exec(os.path.isdir,
os.path.join('.' + path, name))
response.write(
'<li><a href="{0}{1}">{0}</a></li>\n'.format(
name, '/' if is_dir else '').encode(
'ascii', 'xmlcharrefreplace'))
response.write(b'</ul>\r\n')
else:
mime_type = mimetypes.guess_type(path)
if mime_type[0]:
response.add_header('Content-Type', mime_type[0])
response.add_header('Content-Length', str(path_st.st_size))
response.add_header('Last-Modified',
format_date_time(path_st.st_mtime))
response.send_headers()
# TODO: make sure headers are sent at this point
out_fd = response.transport._sock_fd
offset = 0
nbytes = path_st.st_size
while nbytes > 0:
try:
sent = yield from self.fs_exec(
os.sendfile, out_fd, path_fd, offset, nbytes)
except BlockingIOError:
yield from self.wait_for_write(out_fd)
except InterruptedError:
continue
else:
offset += sent
nbytes -= sent
response.write_eof()
ARGS = argparse.ArgumentParser(description="Run simple http server.")
ARGS.add_argument(
'--host', action="store", dest='host',
default='0.0.0.0', help='Host name')
ARGS.add_argument(
'--port', action="store", dest='port',
default=8080, type=int, help='Port number')
ARGS.add_argument(
'--iocp', action="store_true", dest='iocp', help='Windows IOCP event loop')
ARGS.add_argument(
'--ssl', action="store_true", dest='ssl', help='Run ssl mode.')
ARGS.add_argument(
'--sslcert', action="store", dest='certfile', help='SSL cert file.')
ARGS.add_argument(
'--sslkey', action="store", dest='keyfile', help='SSL key file.')
def main():
args = ARGS.parse_args()
if ':' in args.host:
args.host, port = args.host.split(':', 1)
args.port = int(port)
if args.iocp:
from asyncio import windows_events
sys.argv.remove('--iocp')
logging.info('using iocp')
el = windows_events.ProactorEventLoop()
asyncio.set_event_loop(el)
if args.ssl:
here = os.path.join(os.path.dirname(__file__), 'tests')
if args.certfile:
certfile = args.certfile or os.path.join(here, 'sample.crt')
keyfile = args.keyfile or os.path.join(here, 'sample.key')
else:
certfile = os.path.join(here, 'sample.crt')
keyfile = os.path.join(here, 'sample.key')
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext.load_cert_chain(certfile, keyfile)
else:
sslcontext = None
loop = asyncio.get_event_loop()
f = loop.create_server(
lambda: StaticHttpServer(debug=True, keep_alive=10),
args.host, args.port, ssl=sslcontext)
svr = loop.run_until_complete(f)
socks = svr.sockets
print('serving on', socks[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment