Last active
June 21, 2022 14:42
-
-
Save aoleg94/7beb2165a14a090c9fb00e5d0f8446f2 to your computer and use it in GitHub Desktop.
nxmp-http-server.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from http.server import * | |
from http import HTTPStatus | |
import os, sys, io, socket, urllib.parse, html | |
class ApacheLikeHTTPRequestHandler(SimpleHTTPRequestHandler): | |
def list_directory(self, path): | |
"""Helper to produce a directory listing (absent index.html). | |
Return value is either a file object, or None (indicating an | |
error). In either case, the headers are sent, making the | |
interface the same as for send_head(). | |
""" | |
try: | |
list = os.listdir(path) | |
except OSError: | |
self.send_error( | |
HTTPStatus.NOT_FOUND, | |
"No permission to list directory") | |
return None | |
list.sort(key=lambda a: a.lower()) | |
r = [] | |
try: | |
displaypath = urllib.parse.unquote(self.path, | |
errors='surrogatepass') | |
except UnicodeDecodeError: | |
displaypath = urllib.parse.unquote(path) | |
displaypath = html.escape(displaypath, quote=False) | |
enc = sys.getfilesystemencoding() | |
title = 'Directory listing for %s' % displaypath | |
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ' | |
'"http://www.w3.org/TR/html4/strict.dtd">') | |
r.append('<html>\n<head>') | |
r.append('<meta http-equiv="Content-Type" ' | |
'content="text/html; charset=%s">' % enc) | |
r.append('<title>%s</title>\n</head>' % title) | |
r.append('<body>\n<h1>Index of %s</h1>' % displaypath) | |
r.append('<table>') | |
for name in list: | |
fullname = os.path.join(path, name) | |
displayname = linkname = name | |
# Append / for directories or @ for symbolic links | |
if os.path.isdir(fullname): | |
displayname = name + "/" | |
linkname = name + "/" | |
#if os.path.islink(fullname): | |
# displayname = name + "@" | |
# Note: a link to a directory displays with @ and links with / | |
r.append('<tr><td><a href="%s">%s</a></td></tr>' | |
% (urllib.parse.quote(linkname, | |
errors='surrogatepass'), | |
html.escape(displayname, quote=False))) | |
r.append('</table>\n</body>\n</html>\n') | |
encoded = '\n'.join(r).encode(enc, 'surrogateescape') | |
f = io.BytesIO() | |
f.write(encoded) | |
f.seek(0) | |
self.send_response(HTTPStatus.OK) | |
self.send_header("Content-type", "text/html; charset=%s" % enc) | |
self.send_header("Content-Length", str(len(encoded))) | |
self.end_headers() | |
return f | |
def _get_best_family(*address): | |
infos = socket.getaddrinfo( | |
*address, | |
type=socket.SOCK_STREAM, | |
flags=socket.AI_PASSIVE, | |
) | |
family, type, proto, canonname, sockaddr = next(iter(infos)) | |
return family, sockaddr | |
def test(HandlerClass=BaseHTTPRequestHandler, | |
ServerClass=ThreadingHTTPServer, | |
protocol="HTTP/1.0", port=8000, bind=None, user=None): | |
"""Test the HTTP request handler class. | |
This runs an HTTP server on port 8000 (or the port argument). | |
""" | |
ServerClass.address_family, addr = _get_best_family(bind, port) | |
HandlerClass.protocol_version = protocol | |
with ServerClass(addr, HandlerClass) as httpd: | |
host, port = httpd.socket.getsockname()[:2] | |
url_host = f'[{host}]' if ':' in host else host | |
print( | |
f"Serving HTTP on {host} port {port} " | |
f"(http://{url_host}:{port}/) ..." | |
) | |
if user: | |
import pwd | |
pw = pwd.getpwnam(user) | |
os.setgid(pw.pw_gid) | |
os.setuid(pw.pw_uid) | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
print("\nKeyboard interrupt received, exiting.") | |
sys.exit(0) | |
if __name__ == '__main__': | |
import argparse | |
import contextlib | |
parser = argparse.ArgumentParser() | |
#parser.add_argument('--cgi', action='store_true', | |
# help='run as CGI server') | |
parser.add_argument('--bind', '-b', metavar='ADDRESS', | |
help='specify alternate bind address ' | |
'(default: all interfaces)') | |
parser.add_argument('--directory', '-d', default=os.getcwd(), | |
help='specify alternate directory ' | |
'(default: current directory)') | |
parser.add_argument('--user', '-u', default='nobody', | |
help='specify alternate user to drop privileges (default: nobody)') | |
parser.add_argument('port', action='store', default=8000, type=int, | |
nargs='?', | |
help='specify alternate port (default: 8000)') | |
args = parser.parse_args() | |
#if args.cgi: | |
# handler_class = CGIHTTPRequestHandler | |
#else: | |
# handler_class = SimpleHTTPRequestHandler | |
handler_class = ApacheLikeHTTPRequestHandler | |
# ensure dual-stack is not disabled; ref #38907 | |
class DualStackServer(ThreadingHTTPServer): | |
def server_bind(self): | |
# suppress exception when protocol is IPv4 | |
with contextlib.suppress(Exception): | |
self.socket.setsockopt( | |
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) | |
return super().server_bind() | |
def finish_request(self, request, client_address): | |
self.RequestHandlerClass(request, client_address, self, | |
directory=args.directory) | |
test( | |
HandlerClass=handler_class, | |
ServerClass=DualStackServer, | |
port=args.port, | |
bind=args.bind, | |
user=args.user | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment