Skip to content

Instantly share code, notes, and snippets.

@mmerickel
Last active March 30, 2016 20:47
Show Gist options
  • Save mmerickel/54bc3922b4db8e951281 to your computer and use it in GitHub Desktop.
Save mmerickel/54bc3922b4db8e951281 to your computer and use it in GitHub Desktop.
bind the session lifecycle to external request properties like authenticated user
from pyramid.httpexceptions import HTTPSeeOther
from pyramid.view import view_config
@view_config(...)
def login_view(request):
# ... validate the user
user = # ...
next_url = request.route_url('home')
headers = remember(request, user.id)
# bind the session to the user upon login
# the session will be invalidated (always invalidate a session upon login)
request.bind_session_to_user(user)
return HTTPSeeOther(next_url, headers=headers)
@view_config(...)
def logout_view(request):
# ...
next_url = request.route_url('home')
headers = forget(request)
# always invalidate a session when crossing privilege boundaries
request.session.invalidate()
return HTTPSeeOther(next_url, headers=headers)
from .session_protector import (
session_protector,
bind_session_to_user,
)
def includeme(config):
settings = config.get_settings()
session_factory = SignedCookieSessionFactory(
settings['session.secret'],
salt=settings['session.salt'],
cookie_name=settings['session.cookie_name'],
secure=asbool(settings.get('session.secure')),
httponly=True,
hashalg='sha256',
timeout=int(settings['session.timeout']),
reissue_time=int(settings['session.reissue_time']),
)
session_factory = session_protector(session_factory)
config.set_session_factory(session_factory)
config.add_request_method(bind_session_to_user)
import functools
log = __import__('logging').getLogger(__name__)
SESSION_BINDING_TOKEN = '__uid'
_marker = object()
def bind_session_to_user(request, user, session=None, invalidate=True):
if session is None:
# avoid binding checks while performing a binding
# otherwise the session may inadvertently be invalidated/cleared
old_val = getattr(request, '_skip_session_binding_checks', _marker)
request._skip_session_binding_checks = True
session = request.session
if old_val is not _marker:
request._skip_session_binding_checks = old_val
else:
del request._skip_session_binding_checks
if invalidate:
log.debug('invalidating session prior to bind')
session.invalidate()
token = make_session_token(request, user)
log.info('binding session with token="%s"', token)
session[SESSION_BINDING_TOKEN_KEY] = token
def make_session_token(request, user=_marker):
uid = ''
if user is _marker:
user = request.user
if user is not None:
uid = 'u:{0}'.format(user.id)
return uid
def session_protector(factory):
@functools.wraps(factory)
def wrapper(request):
session = factory(request)
if getattr(request, '_skip_session_binding_checks', False):
log.debug('skipping automatic session binding')
return session
expected_token = make_session_token(request)
found_token = session.get(SESSION_BINDING_TOKEN_KEY, '')
invalid_session = False
if expected_token:
if found_token == expected_token:
log.debug('found expected session binding token="%s"',
expected_token)
elif found_token:
log.info('detected session for token="%s" but '
'expected token="%s", invalidating session',
found_token, expected_token)
invalid_session = True
else:
log.info('unbound session, expected token="%s", '
'invalidating session', expected_token)
invalid_session = True
elif found_token:
log.info('detected bound session for token="%s" but '
'expected no token, invalidating session',
found_token)
invalid_session = True
else:
log.debug('found unbound session')
if invalid_session:
session.invalidate()
if expected_token:
log.info('binding new session with token="%s"', expected_token)
session[SESSION_BINDING_TOKEN_KEY] = expected_token
return session
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment