Created
April 8, 2017 01:04
-
-
Save traverseda/df04ead2877dde09016b8d6ce264a300 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uasyncio as asyncio | |
import logging, traceback | |
class Server: | |
def __init__(self,dir="srv"): | |
self.dir = dir | |
self.routes={} | |
self.textTypes=('css','js')#Html is preprocessed | |
self.imageTypes=('png','gif','jpg','ico') | |
@asyncio.coroutine | |
def serve(self,reader,writer): | |
try: | |
yield from self._serve(reader, writer) | |
except Exception as E: | |
traceback.print_exc() | |
yield from writer.awrite("HTTP/1.0 404 Not Found\r\n") | |
yield from writer.awrite('Content-Type: text/html\r\n\r\n') | |
header= open(self.dir+"/header.html", 'r') | |
body= open(self.dir+"/500.html", 'r') | |
footer= open(self.dir+"/footer.html", 'r') | |
for chunk in header.read(): | |
yield from writer.awrite(chunk) | |
for chunk in body.read(1024): | |
yield from writer.awrite(chunk) | |
yield from writer.awrite('<id="traceback" code>') | |
yield from writer.awrite(str(E)) | |
yield from writer.awrite('</code>') | |
for chunk in footer.read(1024): | |
yield from writer.awrite(chunk) | |
yield from writer.aclose() | |
@asyncio.coroutine | |
def serve_html(self,request,includeHeader=True): | |
try: | |
body= open(self.dir+request['path'].decode("utf-8"), 'r') | |
except Exception as e: | |
yield from request["_writer"].awrite("HTTP/1.0 404 Not Found\r\n") | |
request['path']="/404.html" | |
yield from self.serve_html(request,includeHeader=False) | |
return | |
header= open(self.dir+"/header.html", 'r') | |
footer= open(self.dir+"/footer.html", 'r') | |
if includeHeader: | |
yield from request["_writer"].awrite("HTTP/1.0 200 OK\r\n") | |
yield from request["_writer"].awrite('Content-Type: text/html\r\n') | |
yield from request["_writer"].awrite('\r\n') #End header | |
for chunk in header.read(): | |
yield from request["_writer"].awrite(chunk) | |
for chunk in body.read(1024): | |
yield from request["_writer"].awrite(chunk) | |
for chunk in footer.read(1024): | |
yield from request["_writer"].awrite(chunk) | |
@asyncio.coroutine | |
def serveFile(self,request,path=None,htmlPreProcessor=None): | |
if not htmlPreProcessor: | |
htmlPreProcessor=self.serve_html | |
if path: | |
request['path']=path | |
print(request) | |
request['ext'] = request['path'].split(b'.')[-1].decode("utf-8") | |
try: | |
body= open(self.dir+request['path'].decode("utf-8"), 'r') | |
except: | |
yield from request["_writer"].awrite("HTTP/1.0 404 Not Found\r\n") | |
request['path']="/404.html" | |
yield from self.serve_html(request,includeHeader=False) | |
return | |
if request['ext'] == "html": | |
yield from self.serve_html(request) | |
elif request['ext'] in self.textTypes: | |
yield from request["_writer"].awrite("HTTP/1.0 200 OK\r\n") | |
yield from request["_writer"].awrite('Content-Type: text/' + request['ext'] + '\r\n\r\n') | |
for chunk in body.read(1024): | |
yield from request["_writer"].awrite(chunk) | |
elif request['ext'] in self.imageTypes: | |
body= open(self.dir+request['path'].decode("utf-8"), 'r') | |
yield from request["_writer"].awrite("HTTP/1.0 200 OK\r\n") | |
yield from request["_writer"].awrite('Content-Type: image/' + request['ext'] + '\r\n\r\n') | |
for chunk in body.read(1024): | |
request["_writer"].s.send(chunk) | |
else: | |
yield from request["_writer"].awrite("HTTP/1.0 404 Not Found\r\n") | |
request['path']="/404.html" | |
yield from self.serve_html(request,includeHeader=False) | |
@asyncio.coroutine | |
def _serve(self,reader, writer): | |
request = yield from reader.read() | |
request = request.split(b"\r\n") | |
requestLine=request[0].split(b" ") | |
pathWithArgs = requestLine[1].split(b"?") | |
path=pathWithArgs[0] | |
args=tuple() | |
rContext= { | |
'method':requestLine[0], | |
'path':path, | |
'args':args, | |
'request_version':requestLine[2], | |
} | |
for pair in request[1:]: | |
values = pair.split(b":") | |
rContext[str(values[0])]=values[1:] | |
rContext["_reader"]=reader | |
rContext["_writer"]=writer | |
if path in self.routes.keys():#For routing custom fuctions | |
yield from self.routes[rContext['path']](rContext) | |
yield from writer.aclose() | |
else: | |
yield from self.serveFile(rContext) | |
yield from writer.aclose() | |
import logging | |
server=Server() | |
server.routes[b"/"]=lambda context: server.serveFile(context, path=b"/index.html") | |
server.routes[b"/brokenFile"]=lambda context: server.serveFile(context, path=b"/fakeFile.html") | |
server.routes[b"/brokenFunction"]=lambda context: print(context) | |
logging.basicConfig(level=logging.DEBUG) | |
loop = asyncio.get_event_loop() | |
loop.call_soon(asyncio.start_server(server.serve, "127.0.0.1", 8081)) | |
loop.run_forever() | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment