Created
February 27, 2020 18:57
-
-
Save naclcaleb/272582dea82d359845bac01a39599f0a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pymysql | |
| import bleach | |
| import uuid | |
| import bcrypt | |
| import jwt | |
| import random | |
| import datetime | |
| import time | |
| import requests | |
| import json | |
| import Helpers | |
| from services.HLEmail import HLEmail | |
| from services.FileStorage import FileStorage | |
| #Email Config | |
| email_config = Helpers.read_json_from_file("config/email_config.json") | |
| #Email service | |
| email_service = Helpers.service("email") | |
| hlemail = HLEmail(email_config["email"]) | |
| #Load secret key from file | |
| SECRET_KEY = "" | |
| with open("config/encryption_key.txt", 'r') as file: | |
| SECRET_KEY = file.read() | |
| class Auth: | |
| def __init__(self, servername, host, username, password, database): | |
| self.servername = servername | |
| self.host = host | |
| self.username = username | |
| self.password = password | |
| self.database = database | |
| #Blacklisted tokens cache | |
| self.blacklisted_tokens = [] | |
| self.SECRET_KEY = SECRET_KEY | |
| ## Load blacklisted tokens ## | |
| #Connect to the MySQL server | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| #Refresh blacklisted_tokens cache | |
| cursor.execute("SELECT token FROM blacklisted_tokens;") | |
| token_list = cursor.fetchall() | |
| for i in range(len(token_list)): | |
| self.blacklisted_tokens.append(token_list[i]["token"]) | |
| conn.commit() | |
| conn.close() | |
| def sign_in_with_oauth(self, provider_key, provider_name, firstname, lastname, email, profileimage): | |
| #Make a MySQL connection | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| if provider_key is None or provider_name is None: | |
| return '{ "error": "missing-provider" }' | |
| #Sanitize input | |
| provider_key = pymysql.escape_string( bleach.clean(provider_key) ) | |
| if provider_name not in ('apple', 'google'): | |
| conn.close() | |
| return '{ "error": "invalid-provider" }' | |
| #First check if the user exists. If not, create one | |
| cursor.execute("SELECT uid FROM oauth_accounts WHERE provider_key='{}' AND provider_name='{}';".format(provider_key, provider_name)) | |
| user = cursor.fetchone() | |
| if user != None: | |
| conn.close() | |
| return json.dumps({ | |
| "uid": user["uid"], | |
| "access": self.create_token(user["uid"]), | |
| "refresh": self.create_refresh_token(user["uid"]) | |
| }) | |
| #Check if email address has been used before | |
| cursor.execute("SELECT uid FROM users WHERE email='{}';".format(email)) | |
| user = cursor.fetchone() | |
| if user is not None: | |
| uid = user['uid'] | |
| if profileimage is None: | |
| profileimage = 'user/' + uid + '/profile/profile.png' | |
| cursor.execute( "UPDATE users SET firstname='{}', lastname='{}', profileimage='{}' WHERE uid='{}'".format(firstname, lastname, profileimage, uid) ) | |
| cursor.execute( "INSERT INTO oauth_accounts(provider_key, uid, provider_name) VALUES('{}', '{}', '{}');".format(provider_key, uid, provider_name) ) | |
| conn.commit() | |
| conn.close() | |
| return json.dumps({ | |
| "uid": uid, | |
| "access": self.create_token(user["uid"]), | |
| "refresh": self.create_refresh_token(user["uid"]) | |
| }) | |
| if firstname is None or lastname is None or email is None: | |
| return '{"error": "missing-information"}' | |
| firstname = pymysql.escape_string( bleach.clean(firstname) ) | |
| lastname = pymysql.escape_string( bleach.clean(lastname) ) | |
| email = pymysql.escape_string( bleach.clean(email) ) | |
| #Create a new user | |
| error = "" | |
| if len(firstname) == 0: | |
| error = "empty-firstname" | |
| if len(lastname) == 0: | |
| error = "empty-lastname" | |
| if len(email) == 0: | |
| error = "empty-email" | |
| if not (('@' in email) and ('.' in email)): | |
| error = "invalid-email" | |
| if error == "": | |
| #Go ahead and make an entry in the database | |
| uid = uuid.uuid1() | |
| if profileimage is None: | |
| profileimage = 'user/' + str(uid) + '/profile/profile.png' | |
| cursor.execute( "INSERT INTO users(uid, firstname, lastname, email, profileimage) VALUES('{}', '{}', '{}', '{}', '{}');".format(str(uid), firstname, lastname, email, profileimage) ) | |
| #Now, make an entry in the oauth_accounts table | |
| cursor.execute( "INSERT INTO oauth_accounts(provider_key, uid, provider_name) VALUES('{}', '{}', '{}');".format(provider_key, str(uid), provider_name) ) | |
| conn.commit() | |
| conn.close() | |
| fileStorage = FileStorage() | |
| fileStorage.set_default_profile_image(str(uid)) | |
| return json.dumps({ | |
| "uid": str(uid), | |
| "access": self.create_token(str(uid)), | |
| "refresh": self.create_refresh_token(str(uid)) | |
| }) | |
| else: | |
| conn.close() | |
| return '{"error": "' + error + '"}' | |
| #Sign up | |
| def sign_up(self, firstname, lastname, email, password, confirmpassword): | |
| #Make a MySQL connection | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| #Get and sanitize the input | |
| firstname = pymysql.escape_string( bleach.clean(firstname) ) | |
| lastname = pymysql.escape_string( bleach.clean(lastname) ) | |
| email = pymysql.escape_string( bleach.clean( email.lower() ) ) | |
| password = pymysql.escape_string( bleach.clean(password) ) | |
| confirmpassword = pymysql.escape_string( bleach.clean(confirmpassword) ) | |
| #Keep track of errors | |
| error = "" | |
| #Check for empty firstname, lastname, or email | |
| if len(firstname) == 0: | |
| error = "empty-first-name" | |
| if len(lastname) == 0: | |
| error = "empty-last-name" | |
| if len(email) == 0: | |
| error = "empty-email" | |
| #Check for email duplicates | |
| cursor.execute("SELECT uid FROM users WHERE email='" + email + "';") | |
| if len( cursor.fetchall() ) > 0: | |
| error = "email-already-taken" | |
| #Is the email a valid email? | |
| if not ( ("@" in email) and ("." in email) ): | |
| error = "invalid-email" | |
| #Is the password long enough? | |
| #TODO: Determine our personal specifications for passwords | |
| if len(password) < 6: | |
| error = "password-too-short" | |
| #Do the passwords match? | |
| if password != confirmpassword: | |
| error = "passwords-no-match" | |
| if error == "": | |
| #Create a new user | |
| #Generate a uid | |
| uid = uuid.uuid1() | |
| #Hash the password | |
| hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) | |
| #Insert into the database | |
| cursor.execute("INSERT INTO users(uid, firstname, lastname, email, password, profileimage, streak, bio) VALUES('" + str(uid) + "', '" + firstname + "', '" + lastname + "', '" + email + "', '" + hashed_password.decode('utf-8') + "', 'user/" + str(uid) + "/profile/profile.png', 0, '');") | |
| #Commit and close | |
| conn.commit() | |
| conn.close() | |
| #Create and return an auth token | |
| access_token = self.create_token(str(uid)) | |
| refresh_token = self.create_refresh_token(str(uid)) | |
| return '{"access": "' + access_token + '", "refresh": "' + refresh_token + '", "uid": "' + str(uid) + '"}' | |
| else: | |
| #Close the connection | |
| conn.close() | |
| return '{"error": "' + error + '"}' | |
| #Sign in | |
| def sign_in(self, email, password): | |
| #Make a connection to MySQL | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| #Get and sanitize the input | |
| email = pymysql.escape_string( bleach.clean(email.lower() ) ) | |
| password = pymysql.escape_string( bleach.clean(password) ) | |
| #Keep track of errors | |
| error = "" | |
| #Does a user exist with that email? | |
| cursor.execute("SELECT uid, password, banned FROM users WHERE email='" + email + "';") | |
| existingUser = cursor.fetchone() | |
| if existingUser != None: | |
| #If the password is correct... | |
| if existingUser["banned"] != True and bcrypt.checkpw(password.encode('utf-8'), existingUser["password"].encode('utf-8')): | |
| #The user is authenticated; create and return a token | |
| access_token = self.create_token( existingUser["uid"] ) | |
| refresh_token = self.create_refresh_token( existingUser["uid"] ) | |
| return '{"access": "' + access_token + '", "refresh": "' + refresh_token + '", "uid": "' + existingUser['uid'] + '"}' | |
| else: | |
| #The password is wrong | |
| error = "incorrect-email-or-password" | |
| else: | |
| error = "user-no-exist" | |
| #If the user was not authenticated, return the error | |
| return '{"error": "' + error + '"}' | |
| #Create Token | |
| def create_token(self, uid, expiration_minutes= 60 ): | |
| #Calculate time half a year in the future (approximately) | |
| current_time = datetime.datetime.now() | |
| expiration = current_time + datetime.timedelta( minutes=expiration_minutes ) #Defaults to six minutes in the future | |
| token_payload = { | |
| "iss": "highlow", | |
| "exp": time.mktime( expiration.timetuple() ), | |
| "sub": uid, | |
| "typ": "access", | |
| "iat": time.mktime( current_time.timetuple() ) | |
| } | |
| token = jwt.encode(token_payload, self.SECRET_KEY, algorithm="HS256") | |
| token = token.decode('utf-8') | |
| return token | |
| #Validate Token | |
| def validate_token(self, token, accepts_old=False): | |
| try: | |
| payload = jwt.decode(token, self.SECRET_KEY, algorithms=["HS256"]) | |
| except: | |
| return "ERROR-INVALID-TOKEN" | |
| current_timestamp = time.mktime( datetime.datetime.now().timetuple() ) | |
| if not accepts_old and payload["exp"] > current_timestamp and token not in self.blacklisted_tokens and payload["typ"] == "access": | |
| return payload["sub"] | |
| if accepts_old and token not in self.blacklisted_tokens and payload["typ"] == "access": | |
| return payload["sub"] | |
| return "ERROR-INVALID-TOKEN" | |
| #Send password reset email | |
| def send_password_reset_email(self, email): | |
| error = "" | |
| status = "success" | |
| #Clean the email | |
| email = pymysql.escape_string( bleach.clean(email) ) | |
| ## Find user with that email ## | |
| #Connect to the MySQL server | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| #Get the relevant user(s) | |
| cursor.execute("SELECT firstname, lastname, uid, email FROM users WHERE email='" + email + "' AND password IS NOT NULL;") | |
| user = cursor.fetchone() | |
| #Commit and close the connection | |
| conn.commit() | |
| conn.close() | |
| #Check and see if any users existed with that email | |
| if user is None: | |
| error = "user-no-exist" | |
| status = "failure" | |
| if error == "": | |
| #Create a "password reset id" token that expires in a day | |
| token = self.create_token( user["uid"], expiration_minutes= 60 * 24 ) | |
| ## Fetch the password reset email HTML and insert user information and the link we just generated ## | |
| password_reset_html = "" | |
| with open("passwordResetEmail.html", "r") as file: | |
| password_reset_html = file.read() | |
| password_reset_html = password_reset_html.format(user["firstname"], user["lastname"], 'https://' + self.servername + '/password_reset/' + token) | |
| #Send the email | |
| hlemail.send_email(user["email"], "Confirm Password Reset - High/Low", password_reset_html, email_config["password"]) | |
| return { "status": status, "error": error } | |
| #Reset password | |
| def reset_password(self, token, password, confirmpassword): | |
| #Clean the passwords | |
| password = pymysql.escape_string( bleach.clean(password) ) | |
| confirmpassword = pymysql.escape_string( bleach.clean(confirmpassword) ) | |
| #Make sure the id token is valid | |
| uid = self.validate_token(token) | |
| #Keep track of errors | |
| error = "" | |
| status = "success" | |
| if uid == "ERROR-INVALID-TOKEN": | |
| error = "ERROR-INVALID-RESET-ID" | |
| status = "failure" | |
| #Confirm the passwords match | |
| if password != confirmpassword: | |
| error = "passwords-no-match" | |
| status = "failure" | |
| if error == "": | |
| #If the passwords matched and the token is valid, go ahead and reset the password | |
| hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) | |
| #Connect to MySQL | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| #Update the password | |
| cursor.execute("UPDATE users SET password = '" + hashed_password.decode('utf-8') + "' WHERE uid='" + uid + "';") | |
| #Commit and close the connection | |
| conn.commit() | |
| conn.close() | |
| return { "error": error, "status": status } | |
| def blacklist_token(self, token): | |
| #Connect to the MySQL server | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| token = pymysql.escape_string( bleach.clean(token) ) | |
| cursor.execute("INSERT INTO blacklisted_tokens(token) VALUES('{}');".format(token)) | |
| #Refresh blacklisted_tokens cache | |
| cursor.execute("SELECT token FROM blacklisted_tokens;") | |
| token_list = cursor.fetchall() | |
| self.blacklisted_tokens = [] | |
| for i in range(len(token_list)): | |
| self.blacklisted_tokens.append(token_list[i]["token"]) | |
| conn.commit() | |
| conn.close() | |
| def create_refresh_token(self, uid): | |
| #Calculate time half a year in the future (approximately) | |
| current_time = datetime.datetime.now() | |
| expiration = current_time + datetime.timedelta(minutes=60 * 24 * 365 / 2) | |
| token_payload = { | |
| "iss": "highlow", | |
| "exp": time.mktime( expiration.timetuple() ), | |
| "sub": uid, | |
| "typ": "refresh", | |
| "iat": time.mktime( current_time.timetuple() ) | |
| } | |
| token = jwt.encode(token_payload, self.SECRET_KEY, algorithm="HS256") | |
| token = token.decode('utf-8') | |
| return token | |
| def refresh_access(self, refresh_token): | |
| #Make sure the user is already deleted | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| try: | |
| payload = jwt.decode(refresh_token, self.SECRET_KEY, algorithms=["HS256"]) | |
| except: | |
| return "ERROR-INVALID-REFRESH-TOKEN" | |
| current_timestamp = time.mktime( datetime.datetime.now().timetuple() ) | |
| if payload["exp"] > current_timestamp and refresh_token not in self.blacklisted_tokens and payload["typ"] == "refresh": | |
| cursor.execute("SELECT banned FROM users WHERE uid='{}';".format(payload["sub"])) | |
| user = cursor.fetchone() | |
| conn.commit() | |
| conn.close() | |
| if user != None and user["banned"]: | |
| return "ERROR-INVALID-REFRESH-TOKEN" | |
| #Create a new token and return it | |
| new_access_token = self.create_token(payload["sub"]) | |
| return new_access_token | |
| conn.commit() | |
| conn.close() | |
| return "ERROR-INVALID-REFRESH-TOKEN" | |
| def sign_up_test(self): | |
| #Make sure the user is already deleted | |
| conn = pymysql.connect(self.host, self.username, self.password, self.database, cursorclass=pymysql.cursors.DictCursor, charset='utf8mb4') | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM users WHERE email='[email protected]';") | |
| conn.commit() | |
| conn.close() | |
| error_messages = ["empty-first-name", "empty-last-name", "empty-email", | |
| "email-already-taken", "invalid-email", | |
| "password-too-short", "passwords-no-match"] | |
| result = self.sign_up( "Test", "Test", "[email protected]", "longpassword", "longpassword") | |
| if result.get('error') in error_messages: | |
| print("Something went wrong in the sign_up_test, the error was: " + result) | |
| else: | |
| print("Everything went fine in the sign_up_test") | |
| def sign_in_test(self): | |
| error_messages = ["user-no-exist", "incorrect-email-or-password"] | |
| result = self.sign_in( "[email protected]", "longpassword") | |
| if result.get('error') in error_messages: | |
| print("Something went wrong in the sign_in_test, the error was: " + result) | |
| else: | |
| print("Everything went fine in the sign_in_test") | |
| def validate_token_test(self): | |
| token = self.sign_in( "[email protected]", "longpassword" ) | |
| result = self.validate_token( token ) | |
| if result != "ERROR-INVALID_TOKEN": | |
| print("Everything went fine in the validate_token_test") | |
| else: | |
| print("Something went wrong in the validate_token_test, the error was: " + result) | |
| def send_password_reset_email_test(self): | |
| result = self.send_password_reset_email("[email protected]") | |
| if result == "success": | |
| print("send_password_reset_email was a success") | |
| else: | |
| print("send_password_reset_email was not a success, the error is: " + result) | |
| def reset_password_test(self): | |
| token = self.sign_in("[email protected]", "longpassword") | |
| result = self.reset_password( token , "longpassword", "longpassword") | |
| error_messages = ["ERROR-INVALID_TOKEN", "passwords-no-match"] | |
| if result in error_messages: | |
| print("Something went wrong in the reset_password_test, the error is " + result) | |
| elif result == "success": | |
| print("Everything went fine in the reset_password_test") | |
| def run_tests(self): | |
| self.sign_up_test() | |
| self.sign_in_test() | |
| self.validate_token_test() | |
| self.send_password_reset_email_test() | |
| self.reset_password_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment