Created
April 26, 2020 17:15
-
-
Save faustomilletari/2112e8551cee401cb8b36e7d07d34b4d to your computer and use it in GitHub Desktop.
Database class for Lambda function (Medium post)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib, binascii, os | |
from sqlalchemy import create_engine | |
from sqlalchemy import Table, Column, String, MetaData, select, and_ | |
def hash_passwd(passwd): | |
"""Hash a password for storing.""" | |
salt = hashlib.sha256(os.urandom(60)).hexdigest().encode('ascii') | |
pwdhash = hashlib.pbkdf2_hmac('sha512', passwd.encode('utf-8'), salt, 100000) | |
pwdhash = binascii.hexlify(pwdhash) | |
return pwdhash.decode('ascii'), salt.decode('ascii') | |
def hash_passwd_with_salt(passwd, salt): | |
"""Verify a stored password against one provided by user""" | |
pwdhash = hashlib.pbkdf2_hmac('sha512', passwd.encode('utf-8'), salt.encode('ascii'), 100000) | |
pwdhash = binascii.hexlify(pwdhash).decode('ascii') | |
return pwdhash | |
class DB: | |
def __init__(self, user, password, host, port): | |
self.user = user | |
self.password = password | |
self.host = host | |
self.port = port | |
db_string = "postgres://{}:{}@{}:{}".format( | |
self.user, | |
self.password, | |
self.host, | |
self.port | |
) | |
self.db = create_engine(db_string) | |
self.meta = MetaData(self.db) | |
self.user_table = Table( | |
'users', self.meta, | |
Column('email', String, primary_key=True), | |
Column('name', String), | |
Column('surname', String), | |
Column('password', String), | |
Column('salt', String) | |
) | |
def create_tables(self): | |
self.meta.create_all(self.db) | |
return True | |
def insert_new_user(self, user): | |
key, salt = hash_passwd(user['password']) | |
with self.db.connect() as conn: | |
insert_statement = self.user_table.insert().values( | |
email=user['email'], | |
name=user['name'], | |
surname=user['surname'], | |
password=key, | |
salt=salt | |
) | |
conn.execute(insert_statement) | |
return True | |
def login_user(self, email, password): | |
select_st = select([self.user_table]).where(self.user_table.c.email == email) | |
with self.db.connect() as conn: | |
row = conn.execute(select_st).first() | |
if row is None: | |
return False | |
key = hash_passwd_with_salt(password, row.salt) | |
if key != row.password: | |
return False | |
# subject to JWT token release after this call | |
return True | |
def remove_user(self, email): | |
select_st = select([self.user_table]).where(self.user_table.c.email == email) | |
with self.db.connect() as conn: | |
result = conn.execute(select_st).first() | |
if result is None: | |
return False | |
del_st = self.user_table.delete().where(self.user_table.c.email == email) | |
with self.db.connect() as conn: | |
conn.execute(del_st) | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment