Created
March 4, 2015 00:27
-
-
Save kamilion/94a76542132bb91fae2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = '[email protected]' | |
######################################################################################################################## | |
## Imports | |
######################################################################################################################## | |
# Flask imports | |
from flask import g | |
# rethink imports | |
import rethinkdb as r | |
from rethinkdb.errors import RqlRuntimeError | |
# rethink configuration | |
from app.config import rdb | |
######################################################################################################################## | |
## Helper Functions | |
######################################################################################################################## | |
def default_users(database): | |
""" | |
A small function to verify RethinkDB Tables | |
Runs once on database initialization. | |
""" | |
conn = r.connect(host=rdb['host'], port=rdb['port']) | |
try: | |
db = rdb[database].split(':') | |
r.db(db[0]).table(db[1]).insert([ | |
{ | |
"active": True, | |
"admin": True, | |
"email": "[email protected]", | |
"identity_yubico": "ccccccdbildi", | |
"password": "" | |
}, { | |
"active": True, | |
"admin": True, | |
"email": "[email protected]", | |
"identity_yubico": "vvddngikvgln", | |
"password": "" | |
}, { | |
"active": True, | |
"admin": True, | |
"email": "[email protected]", | |
"identity_yubico": "ccccccbcjegt", | |
"password": "" | |
} | |
]).run(conn) | |
print("AUTHMODELDEFAULTS: STARTUP: User Table defaults initialized.") | |
except RqlRuntimeError: | |
print("AUTHMODELDEFAULTS: STARTUP: User Table already exists, won't reinitialize.") | |
finally: | |
conn.close() | |
def index_setup(database, index_name): | |
""" | |
A small function to verify RethinkDB Tables | |
Runs once on application startup. | |
""" | |
conn = r.connect(host=rdb['host'], port=rdb['port']) | |
try: | |
db = rdb[database].split(':') | |
r.db(db[0]).table(db[1]).index_create(index_name).run(conn) | |
print("AUTHMODELDEFAULTS: STARTUP: User Table initialized.") | |
except RqlRuntimeError: | |
print("AUTHMODELDEFAULTS: STARTUP: User Table exists.") | |
finally: | |
conn.close() | |
def table_setup(database): | |
""" | |
A small function to verify RethinkDB Tables | |
Runs once on application startup. | |
""" | |
conn = r.connect(host=rdb['host'], port=rdb['port']) | |
try: | |
db = rdb[database].split(':') | |
r.db(db[0]).table_create(db[1]).run(conn) | |
index_setup(database, 'email') | |
default_users(database) | |
print("AUTHMODELDEFAULTS: STARTUP: User Table initialized.") | |
except RqlRuntimeError: | |
print("AUTHMODELDEFAULTS: STARTUP: User Table exists.") | |
finally: | |
conn.close() | |
def db_setup(database): | |
""" | |
A small function to verify RethinkDB Databases. | |
Runs once on application startup. | |
""" | |
conn = r.connect(host=rdb['host'], port=rdb['port']) | |
try: | |
db = rdb[database].split(':') | |
r.db_create(db[0]).run(conn) | |
print("AUTHMODELDEFAULTS: STARTUP: User Database initialized.") | |
except RqlRuntimeError: | |
print("AUTHMODELDEFAULTS: STARTUP: User Database exists.") | |
finally: | |
conn.close() | |
table_setup(database) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
######################################################################################################################## | |
## Imports | |
######################################################################################################################## | |
# Flask imports | |
from flask import flash | |
from flask.ext.wtf import Form | |
from flask.ext.login import current_user | |
from wtforms.fields import TextField, PasswordField | |
from wtforms.validators import Required | |
# YubiCo import for quick identity splitting | |
from yubico_client.otp import OTP | |
# Our own User authentication model | |
from authmodel import User | |
######################################################################################################################## | |
## Class Definitions | |
######################################################################################################################## | |
class AuthForm(Form): | |
""" | |
A simple Email and Password authentication form. | |
Will do user lookups and provide a User object, if found, to the View. | |
""" | |
email = TextField('email', validators=[Required()]) | |
password = PasswordField('password', validators=[Required()]) | |
def __init__(self, *args, **kwargs): | |
""" | |
Retrieve a user object from a User class | |
@param args: Arguments, in order of definition in class | |
@param kwargs: Keyword based Arguments, in any order | |
""" | |
Form.__init__(self, *args, **kwargs) | |
self.user = None | |
def validate(self): | |
""" | |
Do validation of the form contents. | |
@return: A User object representing the found user, or None | |
""" | |
rv = Form.validate(self) | |
if not rv: | |
flash('A required field is empty', 'error') | |
return False | |
# We need to find a user's id by their email address. | |
user = User.get_user_from_email(self.email.data) | |
if user is None: | |
self.email.errors.append('Unknown Email Address') | |
flash('Unknown Email Address', 'error') | |
return False | |
if not user.check_password(self.password.data): | |
self.password.errors.append('Invalid password') | |
flash('Invalid password', 'error') | |
return False | |
self.user = user | |
return True | |
class YubiAuthForm(Form): | |
""" | |
A simple YubiKey authentication form. | |
Will do user lookups and provide a User object, if found, to the View. | |
""" | |
yubi_input = PasswordField('YubiKey', validators=[Required()]) | |
def __init__(self, *args, **kwargs): | |
""" | |
Retrieve a user object from a User class using a YubiKey token | |
@param args: Arguments, in order of definition in class | |
@param kwargs: Keyword based Arguments, in any order | |
""" | |
Form.__init__(self, *args, **kwargs) | |
self.user = None | |
def validate(self): | |
""" | |
Do validation of the form contents. | |
@return: A User object representing the found user, or None | |
""" | |
rv = Form.validate(self) | |
if not rv: | |
flash('A required field is empty', 'error') | |
return False | |
identity = OTP(self.yubi_input.data).device_id | |
# We need to find a user's id by their email address. | |
user = User.get_user_from_yubitoken(identity) | |
if user is None: | |
self.yubi_input.errors.append('Unknown Identity') | |
flash('Unknown Identity', 'error') | |
return False | |
if not user.check_yubitoken(self.yubi_input.data): | |
self.yubi_input.errors.append('Invalid Token') | |
flash('Invalid Token', 'error') | |
return False | |
self.user = user | |
return True | |
class RegisterForm(Form): | |
""" | |
A simple Email and Password registration form. | |
Will do user lookups and provide a User object, if found, to the View. | |
""" | |
email = TextField('email', validators=[Required()]) | |
password = PasswordField('password', validators=[Required()]) | |
def __init__(self, *args, **kwargs): | |
""" | |
Register a new a user object via a User class helper | |
@param args: Arguments, in order of definition in class | |
@param kwargs: Keyword based Arguments, in any order | |
""" | |
Form.__init__(self, *args, **kwargs) | |
self.user = None | |
def validate(self): | |
""" | |
Do validation of the form contents. | |
@return: True if the User object was successfully created, or False if it was not. | |
""" | |
rv = Form.validate(self) | |
if not rv: | |
flash('A required field is empty', 'error') | |
return False | |
# We need to find a user's id by their email address. | |
user = User.get_user_from_email(self.email.data) | |
if user is not None: | |
self.email.errors.append('Email Address Exists') | |
flash('Email Address Exists', 'error') | |
return False | |
user = User.create(self.email.data, self.password.data) | |
if user is not None: | |
self.user = user | |
return True | |
else: | |
return False | |
class PasswdForm(Form): | |
""" | |
A simple Password Change form. | |
Will do user lookups and alter the User object without returning it to the View. | |
""" | |
password = PasswordField('old password', validators=[Required()]) | |
newpassword = PasswordField('new password', validators=[Required()]) | |
chkpassword = PasswordField('chk password', validators=[Required()]) | |
def __init__(self, *args, **kwargs): | |
""" | |
Change the password of a user object via a User class helper | |
@param args: Arguments, in order of definition in class | |
@param kwargs: Keyword based Arguments, in any order | |
""" | |
Form.__init__(self, *args, **kwargs) | |
self.user = None | |
def validate(self): | |
""" | |
Do validation of the form contents. | |
@return: True if the password can be changed, or False if the operation should not continue. | |
""" | |
rv = Form.validate(self) | |
if not rv: | |
flash('A required field is empty', 'error') | |
return False | |
## We need to get the user's id from the login session. | |
user = current_user # current_user is a proxy object. | |
# We need to assign it to a local to get the reference to the real User object. | |
if user is None: # Then the user doesn't exist. | |
flash('Unknown User', 'error') | |
return False | |
if not user.check_password(self.password.data): | |
self.password.errors.append('Invalid old password') | |
flash('Invalid old password', 'error') | |
return False | |
if not self.chkpassword.data == self.newpassword.data: | |
self.newpassword.errors.append('New Password pair does not match') | |
flash('New Password pair does not match', 'error') | |
return False | |
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = '[email protected]' | |
######################################################################################################################## | |
## Imports | |
######################################################################################################################## | |
# Flask imports | |
from flask import g | |
# Flask-login imports | |
from flask.ext.login import LoginManager, UserMixin | |
# Third Party Libraries | |
import scrypt | |
import random | |
# rethink imports | |
import rethinkdb as r | |
from rethinkdb.errors import RqlRuntimeError | |
# rethink configuration | |
from app.config import rdb | |
import authdefaults | |
# Run these tasks when we're first imported by uwsgi or executed. | |
authdefaults.db_setup('userdb') | |
# pyotp imports | |
import pyotp | |
# yubikey imports | |
from yubico_client import Yubico | |
from yubico_client.otp import OTP | |
from app import config | |
yubico = Yubico(config.yubico_keys['client_id'], config.yubico_keys['secret_key']) | |
login_manager = LoginManager() | |
######################################################################################################################## | |
## Login_Manager's required User Loader function | |
######################################################################################################################## | |
@login_manager.user_loader | |
def load_user(user_uuid): | |
""" | |
Instantiate a user object from a RethinkDB document UUID | |
@param user_uuid: The RethinkDB UUID to request | |
""" | |
if user_uuid != "None": | |
try: | |
return User(user_uuid) | |
except NoSuchUUIDExists: | |
return None | |
######################################################################################################################## | |
## Utility Classes | |
######################################################################################################################## | |
class NoSuchUUIDExists(Exception): | |
""" | |
A small exception class to identify potential UUID mishaps | |
""" | |
pass | |
######################################################################################################################## | |
## User Class | |
######################################################################################################################## | |
class User(UserMixin): | |
""" | |
This class represents a user in RethinkDB. | |
A User class derived from the Flask-Login UserMixin superclass. | |
""" | |
id = None | |
email = None | |
password = None | |
admin = None | |
active = None | |
auth_yubitoken = None | |
auth_hotptoken = None | |
def __init__(self, user_uuid): | |
""" | |
Create a user object from a RethinkDB document | |
@param user_uuid: The RethinkDB UUID to request | |
""" | |
try: | |
db = rdb['userdb'].split(':') | |
results = r.db(db[0]).table(db[1]).get(user_uuid).run(g.rdb_conn) | |
except RqlRuntimeError: | |
print("AUTHMODEL: InitUser: Critical Failure: Saving Throw Failed! while looking up UUID: {}".format(user_uuid)) | |
if results is None: | |
raise NoSuchUUIDExists | |
self.id = results['id'] | |
self.email = results['email'] | |
self.active = results['active'] | |
self.admin = results['admin'] | |
self.password = results['password'] | |
try: | |
self.auth_yubitoken = results['identity_yubico'] | |
except KeyError: | |
pass | |
try: | |
self.auth_hotptoken = results['identity_hotp'] | |
except KeyError: | |
pass | |
print("AUTHMODEL: InitUser: {} Admin: {}".format(self.email, self.admin)) | |
# Overrides UserMixin | |
def is_active(self): | |
""" | |
Check if a user entry is currently an Active User | |
@return: True if the User was active, or False if the operation could not complete. | |
""" | |
return self.active | |
def check_yubitoken(self, token): | |
""" | |
Check the token of a user entry from a provided YubiCo token | |
@param token: The token to check against a YubiCo OTP | |
@return: True if the token was valid, or False if the operation could not complete. | |
""" | |
if self.auth_yubitoken is not None: | |
try: # to verify the token against yubico servers | |
y = yubico.verify(token) | |
except: # If it doesn't work, just return none. | |
y = None | |
# But if it did work, check against our stored token. | |
if y and OTP(token).device_id == self.auth_yubitoken[0:12]: | |
return True | |
return False | |
def check_hotptoken(self, token): | |
""" | |
Check the token of a user entry from a provided HOTP token | |
@param token: The token to check against a HOTP timestamp secret | |
@return: True if the token was valid, or False if the operation could not complete. | |
""" | |
if self.auth_hotptoken is not None: | |
try: | |
token = int(token) | |
except ValueError: | |
token = 0 | |
if pyotp.TOTP(self.auth_hotptoken).now() == token: | |
return True | |
return False | |
def check_password(self, input_pass): | |
""" | |
Check the password of a user entry from a provided input password | |
@param input_pass: The password to decode the secret with | |
@return: True if the password was valid, or False if the operation could not complete. | |
""" | |
try: | |
result = scrypt.decrypt( # Decode the crypto stream | |
self.password.decode('hex'), # By pulling the hexed password from the DB | |
input_pass.encode('ascii', 'ignore'), # And passing in the password | |
0.5) # Take half a second to do this. | |
print("AUTHMODEL: CheckPassword: SUCCESS InputPass: {}".format(input_pass)) | |
return True # We don't check our cookie above, but it matches. | |
except scrypt.error: | |
print("AUTHMODEL: CheckPassword: FAILED InputPass: {}".format(input_pass)) | |
return False # Because you get 'password is incorrect' if it does not. | |
def change_password(self, input_pass): | |
""" | |
Change the password of a user entry from an new password | |
@param input_pass: The password to register with RethinkDB | |
@return: True if the record was updated, or False if the operation could not complete. | |
""" | |
try: | |
new_password = scrypt.encrypt( | |
''.join(chr(random.randint(0, 255)) for i in range(64)), # Store a Random Cookie | |
input_pass.encode('ascii', 'ignore'), # Assure the password is ascii. | |
0.5 # How long should we take? | |
).encode('hex') # Store the completed password as hex | |
print("AUTHMODEL: SetNewPassword: InputPass: {}".format(input_pass)) | |
try: | |
db = rdb['userdb'].split(':') | |
r.db(db[0]).table(db[1]).get(self.id).update( | |
{"password": new_password} | |
).run(g.rdb_conn) | |
except RqlRuntimeError: | |
return False | |
return True # We don't check our cookie above, but it matches. | |
except scrypt.error: | |
return False # Because you get 'password is incorrect' if it does not. | |
# Convenience method | |
@classmethod | |
def create(cls, email, input_pass): | |
""" | |
Create a new user entry from an email address and password | |
@param email: The email address to register with RethinkDB | |
@param input_pass: The password to register with RethinkDB | |
@return: A User object instantiated from the requested email address, or None. | |
""" | |
try: # To make a crypted password | |
new_password = scrypt.encrypt( | |
''.join(chr(random.randint(0, 255)) for i in range(64)), # Store a Random Cookie | |
input_pass.encode('ascii', 'ignore'), # Assure the password is ascii. | |
0.5 # How long should we take? | |
).encode('hex') # Store the completed password as hex | |
print("AUTHMODEL: SetNewPassword: InputPass: {}".format(input_pass)) | |
except scrypt.error: | |
return None # Because you get 'password is incorrect' if it does not. | |
try: # To make the database entry with the crypted password | |
db = rdb['userdb'].split(':') | |
inserted = r.db(db[0]).table(db[1]).insert( | |
{"email": email, "password": new_password, "active": True, "admin": False } | |
).run(g.rdb_conn) | |
except RqlRuntimeError: | |
return None | |
return User(inserted['generated_keys'][0]) # We don't check our cookie above, but it matches. | |
# Convenience method | |
@classmethod | |
def get_user_from_email(cls, email): | |
""" | |
Create a user object from an email address | |
@param email: The email address to request from RethinkDB | |
@return: A User object instantiated from the requested email address, or None. | |
""" | |
if email == "None": | |
return None | |
else: | |
try: | |
db = rdb['userdb'].split(':') | |
cursor = r.db(db[0]).table(db[1]).filter( | |
{'email': email} | |
).pluck('id').run(g.rdb_conn) | |
for document in cursor: | |
return User(document['id']) | |
except RqlRuntimeError: | |
return None | |
# Convenience method | |
@classmethod | |
def get_user_from_yubitoken(cls, yubitoken): | |
""" | |
Create a user object from an yubikey token | |
@param yubitoken: The YubiCo Identity to request from RethinkDB | |
@return: A User object instantiated from the requested YubiCo Identity, or None. | |
""" | |
if yubitoken == "None": | |
return None | |
else: | |
try: | |
db = rdb['userdb'].split(':') | |
cursor = r.db(db[0]).table(db[1]).filter( # Make sure the yubico field is there. | |
lambda this_user: this_user.has_fields('identity_yubico') | |
).filter( # Just in case someone stored a full token instead of identity. | |
lambda row: row['identity_yubico'].match("^{}".format(yubitoken[0:12])) | |
).pluck('id').run(g.rdb_conn) # Return only the UUID for this record. | |
for document in cursor: | |
return User(document['id']) # Return the user object via instantiation | |
except RqlRuntimeError: | |
return None | |
def __repr__(self): | |
return '<User {} Admin: {}>'.format(self.email, self.admin) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = '[email protected]' | |
######################################################################################################################## | |
## Imports | |
######################################################################################################################## | |
# Flask imports | |
from flask import Flask, request, session, g, redirect, url_for, abort, render_template, flash | |
# Flask-login imports | |
from flask.ext.login import login_user, logout_user, current_user, login_required | |
# Flask-classy imports | |
from flask.ext.classy import FlaskView, route | |
# Flask-WTF imports | |
from authform import AuthForm, RegisterForm, PasswdForm, YubiAuthForm | |
from app.config import allow_new_users | |
######################################################################################################################## | |
## View Class | |
######################################################################################################################## | |
class AuthView(FlaskView): | |
""" | |
A simple authentication view for Flask-Classy. | |
Relies on flask-wtf forms to do user lookups and provide a User object, if found, to the View. | |
""" | |
def index(self): | |
""" | |
Display of a Flask-Login compatible Flask-WTF form for standard Email + Passwords. | |
@return: A Jinja2 Template containing a login form | |
""" | |
form = AuthForm() | |
return render_template('auth/login.html', form=form) | |
def post(self): | |
""" | |
Processing of a Flask-Login compatible Flask-WTF form | |
@return: A Jinja2 Template containing a login form, or a redirect to the index or next page. | |
""" | |
form = AuthForm() | |
if form.validate_on_submit(): | |
login_user(form.user, remember=True) | |
flash('logged in', 'success') | |
return redirect(request.args.get('next') or url_for('BaseView:index')) | |
return render_template('auth/login.html', form=form) | |
def logout(self): | |
""" | |
Processing of a Flask-Login compatible Logout method | |
@return: A redirect to the index page. | |
""" | |
logout_user() | |
return redirect(url_for('BaseView:index')) | |
def no_register(self): | |
""" | |
Displays a message preventing registering a new account. | |
@return: A Jinja2 Template containing a a static error message | |
""" | |
return render_template('auth/no_register.html') | |
@login_required | |
def new_user(self): | |
""" | |
Display of a Flask-Login compatible Flask-WTF form for registering a new account. | |
@return: A Jinja2 Template containing a registration form | |
""" | |
if not allow_new_users: | |
return render_template('auth/no_register.html') | |
else: | |
form = RegisterForm() | |
return render_template('auth/register.html', form=form) | |
@route('do_register', methods=['POST']) | |
@login_required | |
def do_register(self): | |
""" | |
Processing of a Flask-Login compatible Flask-WTF form | |
@return: A Jinja2 Template containing a registration form, or a redirect to the index or next page. | |
""" | |
if not allow_new_users: | |
return render_template('auth/no_register.html') | |
else: | |
form = RegisterForm() | |
if form.validate_on_submit(): | |
login_user(form.user, remember=True) | |
flash('registered and logged in', 'success') | |
return redirect(request.args.get('next') or url_for('BaseView:index')) | |
return render_template('auth/register.html', form=form) | |
def yubikey(self): | |
""" | |
Display of a Flask-Login compatible Flask-WTF form for YubiKey Authentication. | |
@return: A Jinja2 Template containing a form with a single YubiKey field | |
""" | |
form = YubiAuthForm() | |
return render_template('auth/yubikey.html', form=form) | |
@route('do_yubikey', methods=['POST']) | |
def do_yubikey(self): | |
""" | |
Processing of a Flask-Login compatible Flask-WTF form for YubiKey Authentication. | |
@return: A Jinja2 Template containing a YubiKey login form, or a redirect to the index or next page. | |
""" | |
form = YubiAuthForm() | |
if form.validate_on_submit(): | |
login_user(form.user, remember=True) | |
flash('logged in', 'success') | |
return redirect(request.args.get('next') or url_for('BaseView:index')) | |
return render_template('auth/yubikey.html', form=form) | |
@login_required | |
def passwd(self): | |
""" | |
Display of a Flask-Login compatible Flask-WTF form for Changing Passwords. | |
@return: A Jinja2 Template containing a passwd form | |
""" | |
form = PasswdForm() | |
return render_template('auth/passwd.html', form=form) | |
# Route decorators always come first. | |
@route('do_passwd', methods=['POST']) | |
@login_required | |
def do_passwd(self): | |
""" | |
Processing of a Flask-Login compatible Flask-WTF form for Changing Passwords. | |
@return: A Jinja2 Template containing a passwd form, or a redirect to the index or next page. | |
""" | |
form = PasswdForm() | |
if form.validate_on_submit(): | |
# Change password here | |
current_user.change_password(form.newpassword.data) | |
flash('password changed', 'success') | |
return redirect(request.args.get('next') or url_for('BaseView:index')) | |
return render_template('auth/passwd.html', form=form) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment