Last active
September 7, 2018 14:31
-
-
Save Lord-Evil/256c9d49c036a9bffbe25a8738ee63a1 to your computer and use it in GitHub Desktop.
Template for properly multi-threaded python micro-service
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import http.server as server | |
from socketserver import ThreadingMixIn | |
from urllib.parse import unquote | |
import json | |
toJson=json.JSONEncoder() | |
listen=("0.0.0.0", 7000) | |
class ThreadedHTTPServer(ThreadingMixIn, server.HTTPServer): pass | |
class HTTPHandler(server.SimpleHTTPRequestHandler): | |
def parseQuery(self): | |
path="" | |
queryDict={} | |
if(len(self.path.split("?"))>1): | |
query="" | |
(path,query)=self.path.split("?") | |
if(len(query)>0): | |
query=unquote(query) | |
for item in query.split("&"): | |
(key,val)=item.split("=") | |
queryDict[key]=val | |
else: | |
path=self.path | |
return (path, queryDict) | |
def do_GET(self): | |
(path, queryDict)=self.parseQuery() | |
#Routs | |
router=dict() | |
router["/someAction"]=self.getSomeAction | |
if(path not in router): | |
self.talkBack({"status":"fail","message":"Route not found!"},404) | |
else: | |
try: | |
self.talkBack(router[path](queryDict)) | |
except BrokenPipeError: | |
print("Client suddenly diconnected") | |
def do_POST(self): | |
if(self.headers["Content-Type"]!="application/json"): | |
self.talkBack({"status":"fail","message":"Bad content type!"}) | |
(path, queryDict)=self.parseQuery() | |
#Routs | |
router=dict() | |
router["/someAction"]=self.postSomeAction | |
if(path not in router): | |
self.talkBack({"status":"fail","message":"Route not found!"},404) | |
else: | |
body=self.rfile.read(int(self.headers['Content-Length'])).decode("UTF-8") | |
try: | |
body=json.loads(body) | |
self.talkBack(router[path](body,queryDict)) | |
except BrokenPipeError: | |
print("Client suddenly diconnected") | |
except: | |
self.talkBack({"status":"fail","message":"Malformed json!"}) | |
def talkBack(self,data,status=200): | |
self.send_response(status) | |
self.send_header("Content-Type","application/json") | |
resp_data=toJson.encode(data).encode("UTF-8") | |
self.send_header("Content-Length",str(len(resp_data))) | |
self.end_headers() | |
self.wfile.write(resp_data) | |
#GET methods | |
def getSomeAction(self,queryDict={}): | |
return {"your_query_string":queryDict} | |
#POST methods | |
def postSomeAction(self,data,queryDict={}): | |
return {"your_query_string":queryDict, "your_data":data} | |
httpd=ThreadedHTTPServer(listen,HTTPHandler) | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
print("Bye!") | |
httpd.server_close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment