|
import sqlite3 |
|
import mysql.connector as mysqlconn |
|
from urllib.parse import urlparse,parse_qs |
|
from http.server import ThreadingHTTPServer,BaseHTTPRequestHandler |
|
from json import dumps |
|
|
|
DB_FILE='1.db' |
|
SQL_SERVER_TUPLE = ("localhost", 4000) |
|
SQL_USER = "root" |
|
SQL_PASS = "123456" |
|
SERVER_TUPLE = ("localhost", 8080) |
|
|
|
|
|
|
|
class DB_Handler(BaseHTTPRequestHandler): |
|
def send_headers(self,code=200,content_type='application/json'): |
|
self.send_response(code) |
|
self.send_header('Content-type', content_type) |
|
self.end_headers() |
|
def do_GET(self): |
|
#db = sqlite3.connect(DB_FILE) |
|
db = mysqlconn.connect(username=SQL_USER, password=SQL_PASS, |
|
host=SQL_SERVER_TUPLE[0], |
|
port=SQL_SERVER_TUPLE[1], |
|
database="table") |
|
cur = db.cursor() |
|
|
|
url = urlparse(self.path) |
|
path = url.path |
|
query = parse_qs(url.query) |
|
if path == "/login": |
|
try: |
|
usr = query.get('usr')[0] |
|
pwd = query.get('pwd')[0] |
|
cur.execute(f"SELECT username,permission,uid FROM user WHERE username='{usr}' AND password='{pwd}'") |
|
res = cur.fetchall() |
|
if res: |
|
self.send_headers(200) |
|
self.wfile.write(dumps({"status":"success","result": { |
|
"username": res[0][0], |
|
"permission": res[0][1], |
|
"uid": res[0][2] |
|
}}).encode()) |
|
else: |
|
self.send_headers(400) |
|
self.wfile.write(dumps({{"status":"fail"}}).encode()) |
|
except Exception as e: |
|
print(str(e)) |
|
self.send_headers(400) |
|
self.wfile.write(dumps({"error": "Bad Request"}).encode()) |
|
else: |
|
self.send_headers(404) |
|
self.wfile.write(dumps({"error": "Not Found"}).encode()) |
|
|
|
if __name__ == '__main__': |
|
srv = ThreadingHTTPServer(SERVER_TUPLE, DB_Handler) |
|
print(f"Server is running at {SERVER_TUPLE[0]}:{SERVER_TUPLE[1]}") |
|
srv.serve_forever() |