-
-
Save BUPTGuo/007a6e589c0d2e48aac6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
"""Simple HTTP Server With Upload. | |
This module builds on BaseHTTPServer by implementing the standard GET | |
and HEAD requests in a fairly straightforward manner. | |
""" | |
__version__ = "0.2" | |
__all__ = ["SimpleHTTPRequestHandler"] | |
__author__ = "bones7456, BUPTGuo" | |
__home_page__ = "http://luy.li/, http://buptguo.com" | |
import os | |
import posixpath | |
import BaseHTTPServer | |
import urllib | |
import cgi | |
import shutil | |
import mimetypes | |
import re | |
try: | |
from cStringIO import StringIO | |
except ImportError: | |
from StringIO import StringIO | |
class SimpleHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): | |
"""Simple HTTP request handler with GET/HEAD/POST commands. | |
This serves files from the current directory and any of its | |
subdirectories. The MIME type for files is determined by | |
calling the .guess_type() method. And can reveive file uploaded | |
by client. | |
The GET/HEAD/POST requests are identical except that the HEAD | |
request omits the actual contents of the file. | |
""" | |
server_version = "SimpleHTTPWithUpload/" + __version__ | |
def do_GET(self): | |
"""Serve a GET request.""" | |
f = self.send_head() | |
if f: | |
self.copyfile(f, self.wfile) | |
f.close() | |
def do_HEAD(self): | |
"""Serve a HEAD request.""" | |
f = self.send_head() | |
if f: | |
f.close() | |
def do_POST(self): | |
"""Serve a POST request.""" | |
r, info = self.deal_post_data() | |
print r, info, "by: ", self.client_address | |
f = StringIO() | |
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') | |
f.write("<html>\n<title>Upload Result Page</title>\n") | |
f.write('<head>\n<meta charset="utf-8">\n</head>\n') | |
f.write("<body>\n<h2>Upload Result Page</h2>\n") | |
f.write("<hr>\n") | |
if r: | |
f.write("<strong>Success:</strong>") | |
else: | |
f.write("<strong>Failed:</strong>") | |
f.write(info) | |
f.write("<br><a href=\"%s\">back</a>" % self.headers['referer']) | |
f.write("</body>\n</html>\n") | |
length = f.tell() | |
f.seek(0) | |
self.send_response(200) | |
self.send_header("Content-type", "text/html") | |
self.send_header("Content-Length", str(length)) | |
self.end_headers() | |
if f: | |
self.copyfile(f, self.wfile) | |
f.close() | |
def deal_post_data(self): | |
boundary = self.headers.plisttext.split("=")[1] | |
remainbytes = int(self.headers['content-length']) | |
line = self.rfile.readline() | |
remainbytes -= len(line) | |
if not boundary in line: | |
return (False, "Content NOT begin with boundary") | |
line = self.rfile.readline() | |
remainbytes -= len(line) | |
fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line) | |
if not fn[0]: | |
return (False, "Can't find out file name...") | |
path = self.translate_path(self.path) | |
fn = os.path.join(path, fn[0]) | |
while os.path.exists(fn): | |
fn += "_" | |
line = self.rfile.readline() | |
remainbytes -= len(line) | |
line = self.rfile.readline() | |
remainbytes -= len(line) | |
try: | |
out = open(fn, 'wb') | |
except IOError: | |
return (False, "Can't create file to write, do you have permission to write?") | |
preline = self.rfile.readline() | |
remainbytes -= len(preline) | |
while remainbytes > 0: | |
line = self.rfile.readline() | |
remainbytes -= len(line) | |
if boundary in line: | |
preline = preline[0:-1] | |
if preline.endswith('\r'): | |
preline = preline[0:-1] | |
out.write(preline) | |
out.close() | |
return (True, "File '%s' upload success!" % fn) | |
else: | |
out.write(preline) | |
preline = line | |
return (False, "Unexpect Ends of data.") | |
def send_head(self): | |
"""Common code for GET and HEAD commands. | |
This sends the response code and MIME headers. | |
Return value is either a file object (which has to be copied | |
to the outputfile by the caller unless the command was HEAD, | |
and must be closed by the caller under all circumstances), or | |
None, in which case the caller has nothing further to do. | |
""" | |
path = self.translate_path(self.path) | |
f = None | |
if os.path.isdir(path): | |
if not self.path.endswith('/'): | |
# redirect browser - doing basically what apache does | |
self.send_response(301) | |
self.send_header("Location", self.path + "/") | |
self.end_headers() | |
return None | |
for index in "index.html", "index.htm": | |
index = os.path.join(path, index) | |
if os.path.exists(index): | |
path = index | |
break | |
else: | |
return self.list_directory(path) | |
ctype = self.guess_type(path) | |
try: | |
# Always read in binary mode. Opening files in text mode may cause | |
# newline translations, making the actual size of the content | |
# transmitted *less* than the content-length! | |
f = open(path, 'rb') | |
except IOError: | |
self.send_error(404, "File not found") | |
return None | |
self.send_response(200) | |
self.send_header("Content-type", ctype) | |
fs = os.fstat(f.fileno()) | |
self.send_header("Content-Length", str(fs[6])) | |
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) | |
self.end_headers() | |
return f | |
def list_directory(self, path): | |
"""Helper to produce a directory listing (absent index.html). | |
Return value is either a file object, or None (indicating an | |
error). In either case, the headers are sent, making the | |
interface the same as for send_head(). | |
""" | |
try: | |
list = os.listdir(path) | |
except os.error: | |
self.send_error(404, "No permission to list directory") | |
return None | |
list.sort(key=lambda a: a.lower()) | |
f = StringIO() | |
displaypath = cgi.escape(urllib.unquote(self.path)) | |
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') | |
f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath) | |
f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath) | |
f.write("<hr>\n") | |
f.write("<form ENCTYPE=\"multipart/form-data\" method=\"post\">") | |
f.write("<input name=\"file\" type=\"file\"/>") | |
f.write("<input type=\"submit\" value=\"upload\"/></form>\n") | |
f.write("<hr>\n<ul>\n") | |
for name in list: | |
fullname = os.path.join(path, name) | |
displayname = linkname = name | |
# Append / for directories or @ for symbolic links | |
if os.path.isdir(fullname): | |
displayname = name + "/" | |
linkname = name + "/" | |
if os.path.islink(fullname): | |
displayname = name + "@" | |
# Note: a link to a directory displays with @ and links with / | |
f.write('<li><a href="%s">%s</a>\n' | |
% (urllib.quote(linkname), cgi.escape(displayname))) | |
f.write("</ul>\n<hr>\n</body>\n</html>\n") | |
length = f.tell() | |
f.seek(0) | |
self.send_response(200) | |
self.send_header("Content-type", "text/html") | |
self.send_header("Content-Length", str(length)) | |
self.end_headers() | |
return f | |
def translate_path(self, path): | |
"""Translate a /-separated PATH to the local filename syntax. | |
Components that mean special things to the local file system | |
(e.g. drive or directory names) are ignored. (XXX They should | |
probably be diagnosed.) | |
""" | |
# abandon query parameters | |
path = path.split('?',1)[0] | |
path = path.split('#',1)[0] | |
path = posixpath.normpath(urllib.unquote(path)) | |
words = path.split('/') | |
words = filter(None, words) | |
path = os.getcwd() | |
for word in words: | |
drive, word = os.path.splitdrive(word) | |
head, word = os.path.split(word) | |
if word in (os.curdir, os.pardir): continue | |
path = os.path.join(path, word) | |
return path | |
def copyfile(self, source, outputfile): | |
"""Copy all data between two file objects. | |
The SOURCE argument is a file object open for reading | |
(or anything with a read() method) and the DESTINATION | |
argument is a file object open for writing (or | |
anything with a write() method). | |
The only reason for overriding this would be to change | |
the block size or perhaps to replace newlines by CRLF | |
-- note however that this the default server uses this | |
to copy binary data as well. | |
""" | |
shutil.copyfileobj(source, outputfile) | |
def guess_type(self, path): | |
"""Guess the type of a file. | |
Argument is a PATH (a filename). | |
Return value is a string of the form type/subtype, | |
usable for a MIME Content-type header. | |
The default implementation looks the file's extension | |
up in the table self.extensions_map, using application/octet-stream | |
as a default; however it would be permissible (if | |
slow) to look inside the data to make a better guess. | |
""" | |
base, ext = posixpath.splitext(path) | |
if ext in self.extensions_map: | |
return self.extensions_map[ext] | |
ext = ext.lower() | |
if ext in self.extensions_map: | |
return self.extensions_map[ext] | |
else: | |
return self.extensions_map[''] | |
if not mimetypes.inited: | |
mimetypes.init() # try to read system mime.types | |
extensions_map = mimetypes.types_map.copy() | |
extensions_map.update({ | |
'': 'application/octet-stream', # Default | |
'.py': 'text/plain', | |
'.c': 'text/plain', | |
'.h': 'text/plain', | |
}) | |
def test(HandlerClass = SimpleHTTPRequestHandler, | |
ServerClass = BaseHTTPServer.HTTPServer): | |
BaseHTTPServer.test(HandlerClass, ServerClass) | |
if __name__ == '__main__': | |
test() |
Hi, BUPTGuo!
以下是对于TODO
的一些想法~
以中文
这两个字为例。根据这里: python3中的字符串是以Unicode编码的。如果知道字符的整数编码,还可以用十六进制这么写str:
>>> '\u4e2d\u6587' #this is unicode
'中文'
以Unicode
表示的str
通过encode()
方法可以编码为指定的bytes
,以便在网络上传输。
>>> '中文'.encode('utf-8')
b'\xe4\xb8\xad\xe6\x96\x87'
如果在server的根目录下建立一个叫做中文
的目录,然后在浏览器中访问。通过观察Chrome的开发者工具,可以看到Request Header
里面的url
对应的中文
是这样的
%E4%B8%AD%E6%96%87
通过对比可以看到,utf-8-encoding
之后的编码的\x
变成了%
。在这里可以看到,两者都是转义字符
,只不过应用场景不一样。
另一方面,利用Chrome的开发者工具,可以看到在Request Headers
里面,不管是用GET
还是POST
,如果路径是中文,url
那一段就会被percent-encoding
。所以我觉得,这一步编码应该是浏览器做的,在服务端的终端只是把收到的GET
或者POST
的url
打印出来了。
阅读代码后发现,不管是在py2
的BaseHTTPServer.py
还是在py3
的http.server.py
,打印这行信息靠的是log_message()
中用的sys.stderr.write()
函数(方法)。
同时也发现,请求信息存放于self.requestline
,其中包含了路径信息,可以用正则表达式路径提取出来。比如在do_GET()
的最后加上这么几行:
path = re.match(r'.* /(.*)/ HTTP',self.requestline).group(1)
print(path)
至于如何实现,和do_GET()
一样,我们可以重写log_message()
或者调用它的log_request()
。后者如下:
def log_request(self, code='-', size='-'):
path1 = self.requestline
m = re.match(r'.* /(.*)/ HTTP', path1)
if m:
path2 = m.group(1)
path3 = urllib.parse.unquote(path2)
path4 = path1.replace(path2, path3)
self.requestline = path4
self.log_message('"%s" %s %s',self.requestline, str(code), str(size))
def do_POST(self):
for i in range(8):
print(self.rfile.readline().decode('utf-8'))
不过感觉这个没有必要,终端输出反正没人看。。复杂了还容易出错。另外,英文路径编码前后是一样的。
2. 【ipv6支持】估计要牵涉到更底层吧,到BaseHTTPServer
这一层才import了socket
,相当于对SimpleHTTPServer
隐藏了socket
。从这里我们可以知道,如果要用ipv6
,则需要s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
。
所以我们的目的就是要修改socket的参数,首先找到test()
这个函数,然后找到它的参数ServerClass = http.server.HTTPServer
,在python目录中找到http
这个文件夹中的server.py
,搜索HTTPServer
,得知它是继承了socketserver.TCPServer
这个类,再去python目录
下找到socketserver.py
这个文件,在TCPServer
这个类中可以看到address_family = socket.AF_INET
(第415行),也就是默认用的ipv4
。如果修改为AF_INET6
(可能需要sudo)并保存,然后在浏览器地址栏输入http://[::1]:8000
,就可以通过ipv6访问了([::1]
是ipv6形式的localhost)。同时也注意到,ipv4仍然能够访问。(以上内容基于py3
,但py2
类似)
还有其他测试方法:
$ ping6 xxxx%eth0:8000
$ nc -zv -6 localhost 8000
$ nc -zv -6 ::1 8000
不过话又说回来,要这么往下改就比较复杂了,失去了原来的轻便。
另外,根据你和bones7456同学的代码和思路,我写了基于python3.4的SimpleHTTPServerWithUpload,代码托管在 这里 ,
然后整个修改过程写在了 这篇文章
TODO: 点击中文目录时,终端输出为 unicode 编码,回头尝试修改
TODO: 尝试 ipv6支持