Skip to content

Instantly share code, notes, and snippets.

@kamilion
Created March 4, 2015 00:27
Show Gist options
  • Save kamilion/94a76542132bb91fae2c to your computer and use it in GitHub Desktop.
Save kamilion/94a76542132bb91fae2c to your computer and use it in GitHub Desktop.
__author__ = '[email protected]'
########################################################################################################################
## Imports
########################################################################################################################
# Flask imports
from flask import g
# rethink imports
import rethinkdb as r
from rethinkdb.errors import RqlRuntimeError
# rethink configuration
from app.config import rdb
########################################################################################################################
## Helper Functions
########################################################################################################################
def default_users(database):
"""
A small function to verify RethinkDB Tables
Runs once on database initialization.
"""
conn = r.connect(host=rdb['host'], port=rdb['port'])
try:
db = rdb[database].split(':')
r.db(db[0]).table(db[1]).insert([
{
"active": True,
"admin": True,
"email": "[email protected]",
"identity_yubico": "ccccccdbildi",
"password": ""
}, {
"active": True,
"admin": True,
"email": "[email protected]",
"identity_yubico": "vvddngikvgln",
"password": ""
}, {
"active": True,
"admin": True,
"email": "[email protected]",
"identity_yubico": "ccccccbcjegt",
"password": ""
}
]).run(conn)
print("AUTHMODELDEFAULTS: STARTUP: User Table defaults initialized.")
except RqlRuntimeError:
print("AUTHMODELDEFAULTS: STARTUP: User Table already exists, won't reinitialize.")
finally:
conn.close()
def index_setup(database, index_name):
"""
A small function to verify RethinkDB Tables
Runs once on application startup.
"""
conn = r.connect(host=rdb['host'], port=rdb['port'])
try:
db = rdb[database].split(':')
r.db(db[0]).table(db[1]).index_create(index_name).run(conn)
print("AUTHMODELDEFAULTS: STARTUP: User Table initialized.")
except RqlRuntimeError:
print("AUTHMODELDEFAULTS: STARTUP: User Table exists.")
finally:
conn.close()
def table_setup(database):
"""
A small function to verify RethinkDB Tables
Runs once on application startup.
"""
conn = r.connect(host=rdb['host'], port=rdb['port'])
try:
db = rdb[database].split(':')
r.db(db[0]).table_create(db[1]).run(conn)
index_setup(database, 'email')
default_users(database)
print("AUTHMODELDEFAULTS: STARTUP: User Table initialized.")
except RqlRuntimeError:
print("AUTHMODELDEFAULTS: STARTUP: User Table exists.")
finally:
conn.close()
def db_setup(database):
"""
A small function to verify RethinkDB Databases.
Runs once on application startup.
"""
conn = r.connect(host=rdb['host'], port=rdb['port'])
try:
db = rdb[database].split(':')
r.db_create(db[0]).run(conn)
print("AUTHMODELDEFAULTS: STARTUP: User Database initialized.")
except RqlRuntimeError:
print("AUTHMODELDEFAULTS: STARTUP: User Database exists.")
finally:
conn.close()
table_setup(database)
########################################################################################################################
## Imports
########################################################################################################################
# Flask imports
from flask import flash
from flask.ext.wtf import Form
from flask.ext.login import current_user
from wtforms.fields import TextField, PasswordField
from wtforms.validators import Required
# YubiCo import for quick identity splitting
from yubico_client.otp import OTP
# Our own User authentication model
from authmodel import User
########################################################################################################################
## Class Definitions
########################################################################################################################
class AuthForm(Form):
"""
A simple Email and Password authentication form.
Will do user lookups and provide a User object, if found, to the View.
"""
email = TextField('email', validators=[Required()])
password = PasswordField('password', validators=[Required()])
def __init__(self, *args, **kwargs):
"""
Retrieve a user object from a User class
@param args: Arguments, in order of definition in class
@param kwargs: Keyword based Arguments, in any order
"""
Form.__init__(self, *args, **kwargs)
self.user = None
def validate(self):
"""
Do validation of the form contents.
@return: A User object representing the found user, or None
"""
rv = Form.validate(self)
if not rv:
flash('A required field is empty', 'error')
return False
# We need to find a user's id by their email address.
user = User.get_user_from_email(self.email.data)
if user is None:
self.email.errors.append('Unknown Email Address')
flash('Unknown Email Address', 'error')
return False
if not user.check_password(self.password.data):
self.password.errors.append('Invalid password')
flash('Invalid password', 'error')
return False
self.user = user
return True
class YubiAuthForm(Form):
"""
A simple YubiKey authentication form.
Will do user lookups and provide a User object, if found, to the View.
"""
yubi_input = PasswordField('YubiKey', validators=[Required()])
def __init__(self, *args, **kwargs):
"""
Retrieve a user object from a User class using a YubiKey token
@param args: Arguments, in order of definition in class
@param kwargs: Keyword based Arguments, in any order
"""
Form.__init__(self, *args, **kwargs)
self.user = None
def validate(self):
"""
Do validation of the form contents.
@return: A User object representing the found user, or None
"""
rv = Form.validate(self)
if not rv:
flash('A required field is empty', 'error')
return False
identity = OTP(self.yubi_input.data).device_id
# We need to find a user's id by their email address.
user = User.get_user_from_yubitoken(identity)
if user is None:
self.yubi_input.errors.append('Unknown Identity')
flash('Unknown Identity', 'error')
return False
if not user.check_yubitoken(self.yubi_input.data):
self.yubi_input.errors.append('Invalid Token')
flash('Invalid Token', 'error')
return False
self.user = user
return True
class RegisterForm(Form):
"""
A simple Email and Password registration form.
Will do user lookups and provide a User object, if found, to the View.
"""
email = TextField('email', validators=[Required()])
password = PasswordField('password', validators=[Required()])
def __init__(self, *args, **kwargs):
"""
Register a new a user object via a User class helper
@param args: Arguments, in order of definition in class
@param kwargs: Keyword based Arguments, in any order
"""
Form.__init__(self, *args, **kwargs)
self.user = None
def validate(self):
"""
Do validation of the form contents.
@return: True if the User object was successfully created, or False if it was not.
"""
rv = Form.validate(self)
if not rv:
flash('A required field is empty', 'error')
return False
# We need to find a user's id by their email address.
user = User.get_user_from_email(self.email.data)
if user is not None:
self.email.errors.append('Email Address Exists')
flash('Email Address Exists', 'error')
return False
user = User.create(self.email.data, self.password.data)
if user is not None:
self.user = user
return True
else:
return False
class PasswdForm(Form):
"""
A simple Password Change form.
Will do user lookups and alter the User object without returning it to the View.
"""
password = PasswordField('old password', validators=[Required()])
newpassword = PasswordField('new password', validators=[Required()])
chkpassword = PasswordField('chk password', validators=[Required()])
def __init__(self, *args, **kwargs):
"""
Change the password of a user object via a User class helper
@param args: Arguments, in order of definition in class
@param kwargs: Keyword based Arguments, in any order
"""
Form.__init__(self, *args, **kwargs)
self.user = None
def validate(self):
"""
Do validation of the form contents.
@return: True if the password can be changed, or False if the operation should not continue.
"""
rv = Form.validate(self)
if not rv:
flash('A required field is empty', 'error')
return False
## We need to get the user's id from the login session.
user = current_user # current_user is a proxy object.
# We need to assign it to a local to get the reference to the real User object.
if user is None: # Then the user doesn't exist.
flash('Unknown User', 'error')
return False
if not user.check_password(self.password.data):
self.password.errors.append('Invalid old password')
flash('Invalid old password', 'error')
return False
if not self.chkpassword.data == self.newpassword.data:
self.newpassword.errors.append('New Password pair does not match')
flash('New Password pair does not match', 'error')
return False
return True
__author__ = '[email protected]'
########################################################################################################################
## Imports
########################################################################################################################
# Flask imports
from flask import g
# Flask-login imports
from flask.ext.login import LoginManager, UserMixin
# Third Party Libraries
import scrypt
import random
# rethink imports
import rethinkdb as r
from rethinkdb.errors import RqlRuntimeError
# rethink configuration
from app.config import rdb
import authdefaults
# Run these tasks when we're first imported by uwsgi or executed.
authdefaults.db_setup('userdb')
# pyotp imports
import pyotp
# yubikey imports
from yubico_client import Yubico
from yubico_client.otp import OTP
from app import config
yubico = Yubico(config.yubico_keys['client_id'], config.yubico_keys['secret_key'])
login_manager = LoginManager()
########################################################################################################################
## Login_Manager's required User Loader function
########################################################################################################################
@login_manager.user_loader
def load_user(user_uuid):
"""
Instantiate a user object from a RethinkDB document UUID
@param user_uuid: The RethinkDB UUID to request
"""
if user_uuid != "None":
try:
return User(user_uuid)
except NoSuchUUIDExists:
return None
########################################################################################################################
## Utility Classes
########################################################################################################################
class NoSuchUUIDExists(Exception):
"""
A small exception class to identify potential UUID mishaps
"""
pass
########################################################################################################################
## User Class
########################################################################################################################
class User(UserMixin):
"""
This class represents a user in RethinkDB.
A User class derived from the Flask-Login UserMixin superclass.
"""
id = None
email = None
password = None
admin = None
active = None
auth_yubitoken = None
auth_hotptoken = None
def __init__(self, user_uuid):
"""
Create a user object from a RethinkDB document
@param user_uuid: The RethinkDB UUID to request
"""
try:
db = rdb['userdb'].split(':')
results = r.db(db[0]).table(db[1]).get(user_uuid).run(g.rdb_conn)
except RqlRuntimeError:
print("AUTHMODEL: InitUser: Critical Failure: Saving Throw Failed! while looking up UUID: {}".format(user_uuid))
if results is None:
raise NoSuchUUIDExists
self.id = results['id']
self.email = results['email']
self.active = results['active']
self.admin = results['admin']
self.password = results['password']
try:
self.auth_yubitoken = results['identity_yubico']
except KeyError:
pass
try:
self.auth_hotptoken = results['identity_hotp']
except KeyError:
pass
print("AUTHMODEL: InitUser: {} Admin: {}".format(self.email, self.admin))
# Overrides UserMixin
def is_active(self):
"""
Check if a user entry is currently an Active User
@return: True if the User was active, or False if the operation could not complete.
"""
return self.active
def check_yubitoken(self, token):
"""
Check the token of a user entry from a provided YubiCo token
@param token: The token to check against a YubiCo OTP
@return: True if the token was valid, or False if the operation could not complete.
"""
if self.auth_yubitoken is not None:
try: # to verify the token against yubico servers
y = yubico.verify(token)
except: # If it doesn't work, just return none.
y = None
# But if it did work, check against our stored token.
if y and OTP(token).device_id == self.auth_yubitoken[0:12]:
return True
return False
def check_hotptoken(self, token):
"""
Check the token of a user entry from a provided HOTP token
@param token: The token to check against a HOTP timestamp secret
@return: True if the token was valid, or False if the operation could not complete.
"""
if self.auth_hotptoken is not None:
try:
token = int(token)
except ValueError:
token = 0
if pyotp.TOTP(self.auth_hotptoken).now() == token:
return True
return False
def check_password(self, input_pass):
"""
Check the password of a user entry from a provided input password
@param input_pass: The password to decode the secret with
@return: True if the password was valid, or False if the operation could not complete.
"""
try:
result = scrypt.decrypt( # Decode the crypto stream
self.password.decode('hex'), # By pulling the hexed password from the DB
input_pass.encode('ascii', 'ignore'), # And passing in the password
0.5) # Take half a second to do this.
print("AUTHMODEL: CheckPassword: SUCCESS InputPass: {}".format(input_pass))
return True # We don't check our cookie above, but it matches.
except scrypt.error:
print("AUTHMODEL: CheckPassword: FAILED InputPass: {}".format(input_pass))
return False # Because you get 'password is incorrect' if it does not.
def change_password(self, input_pass):
"""
Change the password of a user entry from an new password
@param input_pass: The password to register with RethinkDB
@return: True if the record was updated, or False if the operation could not complete.
"""
try:
new_password = scrypt.encrypt(
''.join(chr(random.randint(0, 255)) for i in range(64)), # Store a Random Cookie
input_pass.encode('ascii', 'ignore'), # Assure the password is ascii.
0.5 # How long should we take?
).encode('hex') # Store the completed password as hex
print("AUTHMODEL: SetNewPassword: InputPass: {}".format(input_pass))
try:
db = rdb['userdb'].split(':')
r.db(db[0]).table(db[1]).get(self.id).update(
{"password": new_password}
).run(g.rdb_conn)
except RqlRuntimeError:
return False
return True # We don't check our cookie above, but it matches.
except scrypt.error:
return False # Because you get 'password is incorrect' if it does not.
# Convenience method
@classmethod
def create(cls, email, input_pass):
"""
Create a new user entry from an email address and password
@param email: The email address to register with RethinkDB
@param input_pass: The password to register with RethinkDB
@return: A User object instantiated from the requested email address, or None.
"""
try: # To make a crypted password
new_password = scrypt.encrypt(
''.join(chr(random.randint(0, 255)) for i in range(64)), # Store a Random Cookie
input_pass.encode('ascii', 'ignore'), # Assure the password is ascii.
0.5 # How long should we take?
).encode('hex') # Store the completed password as hex
print("AUTHMODEL: SetNewPassword: InputPass: {}".format(input_pass))
except scrypt.error:
return None # Because you get 'password is incorrect' if it does not.
try: # To make the database entry with the crypted password
db = rdb['userdb'].split(':')
inserted = r.db(db[0]).table(db[1]).insert(
{"email": email, "password": new_password, "active": True, "admin": False }
).run(g.rdb_conn)
except RqlRuntimeError:
return None
return User(inserted['generated_keys'][0]) # We don't check our cookie above, but it matches.
# Convenience method
@classmethod
def get_user_from_email(cls, email):
"""
Create a user object from an email address
@param email: The email address to request from RethinkDB
@return: A User object instantiated from the requested email address, or None.
"""
if email == "None":
return None
else:
try:
db = rdb['userdb'].split(':')
cursor = r.db(db[0]).table(db[1]).filter(
{'email': email}
).pluck('id').run(g.rdb_conn)
for document in cursor:
return User(document['id'])
except RqlRuntimeError:
return None
# Convenience method
@classmethod
def get_user_from_yubitoken(cls, yubitoken):
"""
Create a user object from an yubikey token
@param yubitoken: The YubiCo Identity to request from RethinkDB
@return: A User object instantiated from the requested YubiCo Identity, or None.
"""
if yubitoken == "None":
return None
else:
try:
db = rdb['userdb'].split(':')
cursor = r.db(db[0]).table(db[1]).filter( # Make sure the yubico field is there.
lambda this_user: this_user.has_fields('identity_yubico')
).filter( # Just in case someone stored a full token instead of identity.
lambda row: row['identity_yubico'].match("^{}".format(yubitoken[0:12]))
).pluck('id').run(g.rdb_conn) # Return only the UUID for this record.
for document in cursor:
return User(document['id']) # Return the user object via instantiation
except RqlRuntimeError:
return None
def __repr__(self):
return '<User {} Admin: {}>'.format(self.email, self.admin)
__author__ = '[email protected]'
########################################################################################################################
## Imports
########################################################################################################################
# Flask imports
from flask import Flask, request, session, g, redirect, url_for, abort, render_template, flash
# Flask-login imports
from flask.ext.login import login_user, logout_user, current_user, login_required
# Flask-classy imports
from flask.ext.classy import FlaskView, route
# Flask-WTF imports
from authform import AuthForm, RegisterForm, PasswdForm, YubiAuthForm
from app.config import allow_new_users
########################################################################################################################
## View Class
########################################################################################################################
class AuthView(FlaskView):
"""
A simple authentication view for Flask-Classy.
Relies on flask-wtf forms to do user lookups and provide a User object, if found, to the View.
"""
def index(self):
"""
Display of a Flask-Login compatible Flask-WTF form for standard Email + Passwords.
@return: A Jinja2 Template containing a login form
"""
form = AuthForm()
return render_template('auth/login.html', form=form)
def post(self):
"""
Processing of a Flask-Login compatible Flask-WTF form
@return: A Jinja2 Template containing a login form, or a redirect to the index or next page.
"""
form = AuthForm()
if form.validate_on_submit():
login_user(form.user, remember=True)
flash('logged in', 'success')
return redirect(request.args.get('next') or url_for('BaseView:index'))
return render_template('auth/login.html', form=form)
def logout(self):
"""
Processing of a Flask-Login compatible Logout method
@return: A redirect to the index page.
"""
logout_user()
return redirect(url_for('BaseView:index'))
def no_register(self):
"""
Displays a message preventing registering a new account.
@return: A Jinja2 Template containing a a static error message
"""
return render_template('auth/no_register.html')
@login_required
def new_user(self):
"""
Display of a Flask-Login compatible Flask-WTF form for registering a new account.
@return: A Jinja2 Template containing a registration form
"""
if not allow_new_users:
return render_template('auth/no_register.html')
else:
form = RegisterForm()
return render_template('auth/register.html', form=form)
@route('do_register', methods=['POST'])
@login_required
def do_register(self):
"""
Processing of a Flask-Login compatible Flask-WTF form
@return: A Jinja2 Template containing a registration form, or a redirect to the index or next page.
"""
if not allow_new_users:
return render_template('auth/no_register.html')
else:
form = RegisterForm()
if form.validate_on_submit():
login_user(form.user, remember=True)
flash('registered and logged in', 'success')
return redirect(request.args.get('next') or url_for('BaseView:index'))
return render_template('auth/register.html', form=form)
def yubikey(self):
"""
Display of a Flask-Login compatible Flask-WTF form for YubiKey Authentication.
@return: A Jinja2 Template containing a form with a single YubiKey field
"""
form = YubiAuthForm()
return render_template('auth/yubikey.html', form=form)
@route('do_yubikey', methods=['POST'])
def do_yubikey(self):
"""
Processing of a Flask-Login compatible Flask-WTF form for YubiKey Authentication.
@return: A Jinja2 Template containing a YubiKey login form, or a redirect to the index or next page.
"""
form = YubiAuthForm()
if form.validate_on_submit():
login_user(form.user, remember=True)
flash('logged in', 'success')
return redirect(request.args.get('next') or url_for('BaseView:index'))
return render_template('auth/yubikey.html', form=form)
@login_required
def passwd(self):
"""
Display of a Flask-Login compatible Flask-WTF form for Changing Passwords.
@return: A Jinja2 Template containing a passwd form
"""
form = PasswdForm()
return render_template('auth/passwd.html', form=form)
# Route decorators always come first.
@route('do_passwd', methods=['POST'])
@login_required
def do_passwd(self):
"""
Processing of a Flask-Login compatible Flask-WTF form for Changing Passwords.
@return: A Jinja2 Template containing a passwd form, or a redirect to the index or next page.
"""
form = PasswdForm()
if form.validate_on_submit():
# Change password here
current_user.change_password(form.newpassword.data)
flash('password changed', 'success')
return redirect(request.args.get('next') or url_for('BaseView:index'))
return render_template('auth/passwd.html', form=form)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment