Forked from UniIsland/SimpleHTTPServerWithUpload.py
Last active
November 18, 2024 00:26
-
-
Save swerder/f1ef7cfb48f45370cd75fea47593de74 to your computer and use it in GitHub Desktop.
Simple Python Http Server with Upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""HTTP Server with auth and Upload. | |
This module builds on SimpleHTTPRequestHandler, | |
implements simple Authentication, use ssl | |
full "multipart" rfc2046 handling, | |
multi file upload, create folder and delete files, | |
TableView file listing with icon/size/date | |
from bones7456: https://github.com/bones7456/bones7456/blob/master/SimpleHTTPServerWithUpload.py / https://gist.github.com/UniIsland/3346170 | |
- originally based logic | |
from cpdef: https://gist.github.com/cpdef/9f4fa956ff41ca8b9902c4b329596acc | |
- authenticate, certificate, python3 | |
from joonahn: https://gist.github.com/joonahn/5c21fde633bf61087fa3faea77e1f77f | |
- multi upload | |
from Tallguy297: https://github.com/Tallguy297/SimpleHTTPServerWithUpload/blob/master/SimpleHTTPServerWithUpload.py | |
- style change | |
from swerder: https://gist.github.com/swerder/f1ef7cfb48f45370cd75fea47593de74 | |
- merging all together | |
- extend SimpleHTTPRequestHandler not BaseHTTPRequestHandler (getting if-modified support) | |
- refactor html generation | |
- complete refactor | |
- add full "multipart/form-data" handling | |
- add folder creation | |
- add file deletion | |
create certificate with the following command: | |
openssl req -new -x509 -keyout server.pem -out server.pem -days 365 -nodes | |
usage: server.py [port] [user:password] [certfile] | |
example: ../server.py 8443 test:topsecret ../server.pem | |
access the server via the browser: | |
https://ip:port | |
the "s" in https is important for ssl/tls | |
""" | |
__version__ = "0.3" | |
__all__ = ["AuthRequestHandler", "MultipartAuthRequestHandler","FileUploadRequestHandler","FileUploadRequestHandlerTableView"] | |
__author__ = "swerder" | |
__source_page__ = "https://gist.github.com/swerder/f1ef7cfb48f45370cd75fea47593de74" | |
__contributors__ = "bones7456,cpdef,joonahn,Tallguy297" | |
import os | |
import posixpath | |
import http.server | |
import urllib.request, urllib.parse, urllib.error | |
import html | |
import http | |
import shutil | |
import mimetypes | |
import re | |
from io import BytesIO | |
import ssl | |
import sys | |
import base64 | |
import time | |
from string import Template | |
class AuthRequestHandler(http.server.SimpleHTTPRequestHandler): | |
"""Handler for HEAD,GET,POST based on SimpleHTTPRequestHandler that handle Authentication | |
after success it call do_<VERB>_auth (e.g. do_GET_auth) | |
""" | |
server_version = "AuthRequestHandler/" + __version__ | |
sys_version = "" # replaces Python/3.X.X | |
sleep_after_login_failed = 10 | |
realm = "Login" | |
def is_authenticated(self): | |
global key | |
auth_header = self.headers['Authorization'] | |
if auth_header != None: | |
auth_header = auth_header.encode("utf-8") | |
return auth_header and auth_header == b'Basic ' + key | |
def do_AUTHHEAD(self): | |
self.send_response(http.HTTPStatus.UNAUTHORIZED) | |
self.send_header('WWW-Authenticate', f'Basic realm="{self.realm}"') | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
def try_authenticate(self): | |
if not self.is_authenticated(): | |
self.server_version = "" # don't show what I am before login | |
self.do_AUTHHEAD() | |
print('not authenticated') | |
self.wfile.write(b'not authenticated') | |
self.close_connection = True | |
time.sleep(sleep_after_login_failed) | |
return False | |
print('authenticated') | |
return True | |
def do_HEAD_auth(self): | |
super().do_HEAD() | |
def do_GET_auth(self): | |
super().do_GET() | |
def do_POST_auth(self): | |
super().do_POST() | |
def do_HEAD(self): | |
"""Serve a HEAD request.""" | |
if self.try_authenticate(): | |
self.do_HEAD_auth() | |
def do_GET(self): | |
"""Serve a GET request.""" | |
if self.try_authenticate(): | |
self.do_GET_auth() | |
def do_POST(self): | |
"""Serve a POST request.""" | |
if self.try_authenticate(): | |
self.do_POST_auth() | |
class PartConf(): | |
def __init__(self, name, out=None, headers=None, handler=None): | |
self.name = name | |
self.out = out | |
self.headers = headers | |
self.handler = handler | |
class DisplayableException(Exception): | |
pass | |
class MultipartAuthRequestHandler(AuthRequestHandler): | |
"""Handler parse POST request with content-type 'multipart/*' designed for 'multipart/form-data' | |
""" | |
server_version = "MultipartAuthRequestHandler/" + __version__ | |
def generate_html_response(self, title, headContent, bodyContent, lang="en", enc="UTF-8"): | |
return f""" | |
<!DOCTYPE html> | |
<html lang="{lang}"> | |
<head> | |
<meta charset="{enc}"> | |
<title>{title}</title> | |
{headContent} | |
</head> | |
<body> | |
{bodyContent} | |
</body> | |
</html> | |
""".encode(enc, 'surrogateescape') | |
def send_html_response_header(self, htmlContent, status=http.HTTPStatus.OK, enc="UTF-8"): | |
self.send_response(status) | |
self.send_header("Content-type", f"text/html; charset={enc}") | |
self.send_header("Content-Length", str(len(htmlContent))) | |
self.end_headers() | |
def send_html_response(self, title, headContent, bodyContent, lang="en", enc="UTF-8", status=http.HTTPStatus.OK): | |
htmlContent = self.generate_html_response(title, headContent, bodyContent, lang, enc) | |
self.send_html_response_header(htmlContent, status, enc) | |
self.wfile.write(htmlContent) | |
def do_POST_auth(self): | |
try: | |
info = self.deal_post_data() | |
status = http.HTTPStatus.OK | |
except DisplayableException as ex: | |
status = http.HTTPStatus.INTERNAL_SERVER_ERROR | |
info = str(ex) | |
print((status, info, "by: ", self.client_address, self.headers.get('X-Forwarded-For', ''))) | |
bodyContent = self.get_POST_body_html(info, status) | |
self.send_html_response("post response page", "", bodyContent, status=status) | |
def get_POST_body_html(self, info, status): | |
return info; | |
def deal_post_data(self): | |
if self.headers.get_content_maintype() == "multipart": | |
return self.deal_multipart_data() | |
raise DisplayableException("multipart Content-Type required, e.g. multipart/form-data but is " + self.headers.get_content_type()) | |
def deal_multipart_data(self): | |
boundary = self.headers.get_boundary() | |
if boundary is None: | |
raise DisplayableException("Content-Type header doesn't contain boundary") | |
boundary = boundary.encode() | |
boundaryEnd = boundary + b"--" | |
remainbytes = int(self.headers['content-length']) | |
response = [] | |
#read all parts, https://www.rfc-editor.org/rfc/rfc2046#section-5.1.1 | |
partConf = PartConf("preamble") | |
preline = None | |
while remainbytes > 0: | |
line = self.rfile.readline() | |
remainbytes -= len(line) | |
#print (partConf.name, "line", line, "preline",preline, remainbytes) # debuging | |
if boundary in line: | |
if partConf.out != None: | |
#write last part/preline but remove CRLF (part of boundary logic) | |
if preline[-2:] == b'\r\n': #CRLF | |
preline = preline[0:-2] | |
else: #only CR or only LF (not correct boundary logic) | |
preline = preline[0:-1] | |
partConf.out.write(preline) | |
if partConf.handler: | |
message = partConf.handler(partConf) | |
if message != None: | |
response.append(message) | |
else: | |
if partConf.out: | |
partConf.out.close() | |
if boundaryEnd in line: | |
partConf = PartConf("epilogue") | |
else: | |
partConf = PartConf("__header__", None, {}) | |
preline = None | |
else: | |
if partConf.out != None: | |
partConf.out.write(preline) | |
elif partConf.name == "__header__": | |
#read part header | |
if preline != None: | |
if preline == b'\r\n' or preline == b'\n': | |
partConf = self.deal_multipart_header(partConf) | |
else: | |
header = [p.strip() for p in preline.decode().split(":", 1)] | |
partConf.headers[header[0]] = header[-1] #-1 =last, here same as 1, but no error if not correct header/no':' delimiter | |
preline = line | |
else: | |
if partConf.name != "epilogue": | |
raise DisplayableException("invalid multipart, end boundary not found") | |
#if partConf.out != None: | |
# partConf.out.write(preline) | |
#if partConf.handler: | |
# message = partConf.handler(partConf) | |
# if message != None: | |
# response.append(message) | |
return "<br>".join(response) | |
def split_header_value(self, header): | |
header_parts = [p.strip() for p in header.split(";")] | |
value = header_parts[0] | |
params = {key.strip():val.strip().strip('"') for key,val in [p.split("=") for p in header_parts[1:]]} | |
return value, params | |
def deal_multipart_header(self, partConf): | |
if self.headers.get_content_type() == "multipart/form-data": | |
#https://www.rfc-editor.org/rfc/rfc7578#section-4.2 | |
if not "Content-Disposition" in partConf.headers: | |
raise DisplayableException("missing Content-Disposition header, invalid multipart/form-data syntax") | |
dispositionValue, dispositionParams = self.split_header_value(partConf.headers["Content-Disposition"]) | |
if dispositionValue != "form-data": | |
raise DisplayableException(f"invalid Content-Disposition header, expect 'form-data' type but get '${dispositionValue}'") | |
name = dispositionParams.get("name") | |
if not name: | |
raise DisplayableException("invalid Content-Disposition header, need name param") | |
return self.deal_field_header(dispositionParams, partConf.headers) | |
else: | |
return PartConf(None, BytesIO(), partConf.headers, self.deal_field_end) | |
def deal_field_header(self, dispositionParams, headers): | |
return PartConf(dispositionParams.get("name"), BytesIO(), headers, self.deal_field_end) | |
def deal_field_end(self, partConf): | |
content = partConf.out.getvalue().decode() | |
partConf.out.close() | |
return self.deal_field_data(partConf.name, content, partConf.headers) | |
def deal_field_data(self, name, content, headers): | |
return f"{name} = '{content}'" | |
class FileUploadRequestHandler(MultipartAuthRequestHandler): | |
"""HTTP request handler with GET/HEAD/POST commands. | |
This serves files from the current directory and any of it's | |
subdirectories. The MIME type for files is determined by | |
calling the .guess_type() method. And can reveive file upload, | |
folder creation and file delete by client. | |
The GET/HEAD requests are identical except that the HEAD | |
request omits the actual contents of the file. | |
""" | |
server_version = "FileUploadRequestHandler/" + __version__ | |
show_upload = True | |
show_create_folder = True | |
show_delete_file = True | |
def get_base_style_html(self): | |
return """<style type="text/css"> | |
* {font-family: Helvetica; font-size: 16px; } | |
a { text-decoration: none; } | |
</style> | |
""" | |
def get_POST_body_html(self, info, status): | |
if 'referer' in self.headers: | |
referer = html.escape(self.headers['referer']) | |
backLink = f'<br><a href="{referer}">back</a>' | |
else: | |
backLink = '' | |
result = "Success:" if status == http.HTTPStatus.OK else "Failed:" | |
bodyContent = Template(""" | |
<h2>Upload Result Page</h2> | |
<hr> | |
<strong>$result</strong><br> | |
$info | |
$backLink | |
<hr> | |
<small>Powered By: $by, check for new version <a href="$src" target="_blank">here</a>.</small> | |
<hr> | |
""").substitute(result=result, info=info, backLink=backLink, by=__author__ + " and " + __contributors__, src=__source_page__).strip() | |
return bodyContent | |
def deal_field_header(self, dispositionParams, headers): | |
if "filename" in dispositionParams: | |
return self.deal_fileupload_header(dispositionParams, headers) | |
return super().deal_field_header(dispositionParams, headers) | |
def deal_fileupload_header(self, dispositionParams, headers): | |
if not self.show_upload: | |
raise DisplayableException("disallowed upload request") | |
filename = dispositionParams.get("filename") | |
if not filename: | |
raise DisplayableException("Can't find out file name...") | |
filePath = self.get_full_path(filename) | |
if filePath == None: | |
raise DisplayableException("upload to folder not allowed") | |
print(f"try to open {filePath}") | |
try: | |
out = open(filePath, 'wb') | |
except IOError: | |
raise DisplayableException("Can't create file to write.<br>Do you have permission to write?") | |
partConf = PartConf("file", out, headers, self.deal_file_end) | |
partConf.dispositionParams = dispositionParams | |
return partConf | |
def get_full_path(self, filename): | |
path = self.translate_path(self.path) | |
#prevent any folder char in filename | |
filename = filename.split('/')[-1].split('\\')[-1] | |
filePath = os.path.join(path, filename) | |
#double check valid path | |
if not os.path.abspath(filePath).startswith(os.path.abspath("."+self.path)): | |
return None | |
return filePath | |
def deal_field_data(self, name, content, headers): | |
if name == "folder": | |
return self.deal_field_folder_data(content, headers) | |
elif name == "delete": | |
return self.deal_field_delete_data(content, headers) | |
else: | |
raise DisplayableException(f"unknown field: {name} = '{content}'") | |
def deal_field_folder_data(self, content, headers): | |
if not self.show_create_folder: | |
raise DisplayableException("disallowed folder request") | |
dirPath = self.get_full_path(content) | |
if not dirPath: | |
raise DisplayableException(f"folder name invalid '{content}'") | |
elif not os.path.exists(dirPath): | |
os.mkdir(dirPath) | |
return f"created folder: '{content}'" | |
else: | |
raise DisplayableException(f"folder '{content}' already exist") | |
def deal_field_delete_data(self, content, headers): | |
if not self.show_delete_file: | |
raise DisplayableException("disallowed delete request") | |
filePath = self.get_full_path(content) | |
if not filePath: | |
raise DisplayableException(f"file name invalid '{content}'") | |
if os.path.exists(filePath): | |
os.remove(filePath) | |
return f"deleted file '{content}'" | |
else: | |
raise DisplayableException(f"file does not exist '{content}'") | |
def deal_file_end(self, partConf): | |
partConf.out.close() | |
return partConf.dispositionParams.get("filename") + " uploaded" | |
def list_directory(self, path): | |
"""Helper to produce a directory listing (absent index.html). | |
Return value is either a file object, or None (indicating an | |
error). In either case, the headers are sent, making the | |
interface the same as for send_head(). | |
""" | |
try: | |
list = os.listdir(path) | |
except OSError: | |
self.send_error( | |
http.HTTPStatus.NOT_FOUND, | |
"No permission to list directory") | |
return None | |
list.sort(key=lambda a: a.lower()) | |
try: | |
displaypath = urllib.parse.unquote(self.path, errors='surrogatepass') | |
except UnicodeDecodeError: | |
displaypath = urllib.parse.unquote(path) | |
displaypath = html.escape(displaypath, quote=False) | |
enc = sys.getfilesystemencoding() | |
htmlContent = self.list_directory_generate_html(path, list, displaypath, enc) | |
self.send_html_response_header(htmlContent, enc=enc) | |
f = BytesIO() | |
f.write(htmlContent) | |
f.seek(0) | |
return f | |
def list_directory_generate_html(self, path, list, displaypath, enc): | |
fileListHtml, headContent = self.list_directory_files(path, list) | |
title = f'Directory listing for {displaypath}' | |
r = [] | |
r.append(f'<h1>{title}</h1>') | |
if self.show_upload: | |
r.append(self.get_upload_html()) | |
if self.show_create_folder: | |
r.append(self.get_create_folder_html()) | |
r.append('<hr>') | |
r.append(fileListHtml) | |
r.append('<hr>') | |
bodyContent = '\n'.join(r) | |
return self.generate_html_response(title, headContent, bodyContent, enc=enc) | |
def get_upload_html(self): | |
return """ | |
<hr> | |
<form ENCTYPE="multipart/form-data" method="post"> | |
<input name="file" type="file" multiple/> | |
<input type="submit" value="upload"/> | |
</form> | |
""" | |
def get_create_folder_html(self): | |
return """ | |
<hr> | |
<form ENCTYPE="multipart/form-data" method="post"> | |
<label for="folder">new folder</label> | |
<input name="folder" id="folder" type="text"/> | |
<input type="submit" value="create"/> | |
</form> | |
""" | |
def get_delete_html(self, filename): | |
filename = html.escape(filename) | |
return f""" | |
<form ENCTYPE="multipart/form-data" method="post" style="display: inline;"> | |
<input name="delete" type="hidden" value="{filename}"/> | |
<input type="submit" value="delete"/> | |
</form> | |
""" | |
def list_directory_files(self, path, list): | |
r = [] | |
r.append('<ul>') | |
if path[:-1] != os.getcwd(): | |
r.append('<li><a href="../">Parent Directory</a></li>') | |
deleteForm = "" | |
for name in list: | |
fullname = os.path.join(path, name) | |
displayname = linkname = name | |
# Append / for directories or @ for symbolic links | |
if os.path.isdir(fullname): | |
displayname = name + "/" | |
linkname = name + "/" | |
deleteForm = "" | |
elif self.show_delete_file: | |
deleteForm = self.get_delete_html(name) | |
if os.path.islink(fullname): | |
displayname = name + "@" | |
# Note: a link to a directory displays with @ and links with / | |
r.append('<li><a href="%s">%s</a>%s</li>' | |
% (urllib.parse.quote(linkname, errors='surrogatepass'), | |
html.escape(displayname, quote=False), deleteForm)) | |
r.append('</ul>') | |
return '\n'.join(r), self.get_base_style_html() | |
def fbytes(size): | |
'Return the given bytes as a human friendly KB, MB, GB, or TB string' | |
size = float(size) | |
KB = float(1024) | |
MB = float(KB ** 2) # 1,048,576 | |
GB = float(KB ** 3) # 1,073,741,824 | |
TB = float(KB ** 4) # 1,099,511,627,776 | |
if size < KB: | |
return '{0} {1}'.format(size,'Bytes' if size > 1 else 'Byte') | |
elif KB <= size < MB: | |
return '{0:.2f} KB'.format(size/KB) | |
elif MB <= size < GB: | |
return '{0:.2f} MB'.format(size/MB) | |
elif GB <= size < TB: | |
return '{0:.2f} GB'.format(size/GB) | |
elif TB <= size: | |
return '{0:.2f} TB'.format(size/TB) | |
class FileUploadRequestHandlerTableView(FileUploadRequestHandler): | |
server_version = "FileUploadRequestHandlerTableView/" + __version__ | |
image_ext = ['bmp','gif','jpg','jpeg','png'] | |
ext_to_class = { | |
'avi':'img-movie', | |
'mpg':'img-movie', | |
'idx':'img-subtitle', | |
'srt':'img-subtitle', | |
'sub':'img-subtitle', | |
'iso':'img-iso', | |
} | |
class_to_img = { | |
'img-default':'', | |
'img-parent':'', | |
'img-dir':'', | |
'img-link':'', | |
'img-movie':'', | |
'img-subtitle':'', | |
'img-iso':'', | |
} | |
def list_directory_head_html_additional(self, usedClass): | |
img_class_templ = Template(".file-img.$clazz { background-image: url('$img')}") | |
return """ | |
<style type="text/css"> | |
* {font-family: Helvetica; font-size: 16px; } | |
a { text-decoration: none; } | |
a:link { text-decoration: none; font-weight: bold; color: #0000ff; } | |
a:visited { text-decoration: none; font-weight: bold; color: #0000ff; } | |
a:active { text-decoration: none; font-weight: bold; color: #0000ff; } | |
a:hover { text-decoration: none; font-weight: bold; color: #ff0000; } | |
table { border-collapse: separate;} | |
th, td { padding:0px 10px;} | |
.file-size { text-align:right; font-weight: bold; color:#FF0000; } | |
.file-date { text-align:right; font-weight: bold; } | |
.file-img { width:24px; height:24px; background-size: contain; background-position: center; background-repeat: no-repeat;} | |
""" + \ | |
"\n".join(img_class_templ.substitute(clazz=clazz, img=img) for clazz,img in self.class_to_img.items() if clazz in usedClass) + \ | |
""" | |
</style> | |
""" | |
def list_directory_files(self, path, list): | |
file_html_templ = Template('<tr><td><div class="file-img $fileImgClass" $specialImgStyle></div></td><td><a href="$linkname" target="$target">$displayname</a></td><td class="file-size">$fsize</td><td class="file-date">$created_date</td>$deleteForm</tr>\n') | |
img_style_templ = Template(''' style="background-image: url('$name')"''') | |
r=[] | |
deleteForm = '' | |
r.append('<table>') | |
usedClass=set() | |
if path[:-1] != os.getcwd(): | |
fileImgClass = 'img-parent' | |
r.append(file_html_templ.substitute(fileImgClass=fileImgClass, specialImgStyle="", linkname="../", target="_self", displayname="Parent Directory", fsize="", created_date="", deleteForm=deleteForm)) | |
usedClass.add(fileImgClass) | |
for name in list: | |
fileImgClass = 'img-default' | |
target = '_blank' | |
specialImgStyle = "" | |
fullname = os.path.join(path, name) | |
displayname = linkname = name | |
fsize = fbytes(os.path.getsize(fullname)) | |
created_date = time.strftime("%d.%m.%Y %H:%M", time.localtime(os.path.getctime(fullname))) | |
# Append / for directories or @ for symbolic links | |
if os.path.isdir(fullname): | |
fileImgClass = 'img-dir' | |
target = '_self' | |
displayname = name + '/' | |
linkname = name + '/' | |
fsize = '' | |
created_date = '' | |
deleteForm = '' | |
elif self.show_delete_file: | |
deleteForm = '<td>' + self.get_delete_html(name) + '</td>' | |
if os.path.islink(fullname): | |
fileImgClass = 'img-link' | |
displayname = name + '@' | |
ext = name.rsplit(".",1)[-1] | |
if ext in self.image_ext: | |
fileImgClass = '' | |
specialImgStyle = img_style_templ.substitute(name=name) | |
else: | |
fileImgClass = self.ext_to_class.get(ext, fileImgClass) | |
# Note: a link to a directory displays with @ and links with / | |
r.append(file_html_templ.substitute(fileImgClass=fileImgClass, specialImgStyle=specialImgStyle, linkname=urllib.parse.quote(linkname), target=target, displayname=html.escape(displayname), fsize=fsize, created_date=created_date, deleteForm=deleteForm)) | |
usedClass.add(fileImgClass) | |
r.append('</table>') | |
return '\n'.join(r), self.list_directory_head_html_additional(usedClass) | |
def run(HandlerClass = FileUploadRequestHandlerTableView, | |
ServerClass = http.server.HTTPServer, | |
port = 8000, | |
protocol = "HTTP/1.0", | |
bind = ""): | |
""" | |
This runs an HTTP server on port 8000 (or the port argument). | |
""" | |
server_address = (bind, port) | |
HandlerClass.protocol_version = protocol | |
httpd = ServerClass(server_address, HandlerClass) | |
global certfile | |
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=certfile, | |
server_side=True) | |
sa = httpd.socket.getsockname() | |
print("Serving HTTP on", sa[0], "port", sa[1], "...") | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
print("\nKeyboard interrupt received, exiting.") | |
httpd.server_close() | |
sys.exit(0) | |
if __name__ == '__main__': | |
if len(sys.argv) < 3: | |
print(sys.argv[0] + " <port> <user>:<pw> <cert.pem>") | |
sys.exit(1) | |
port = int(sys.argv[1]) | |
key = base64.b64encode(sys.argv[2].encode("utf-8")) | |
certfile = sys.argv[3] | |
run(port=port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ssl server may optional by args, disabled by default