Last active
March 30, 2016 20:47
-
-
Save mmerickel/54bc3922b4db8e951281 to your computer and use it in GitHub Desktop.
bind the session lifecycle to external request properties like authenticated user
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyramid.httpexceptions import HTTPSeeOther | |
from pyramid.view import view_config | |
@view_config(...) | |
def login_view(request): | |
# ... validate the user | |
user = # ... | |
next_url = request.route_url('home') | |
headers = remember(request, user.id) | |
# bind the session to the user upon login | |
# the session will be invalidated (always invalidate a session upon login) | |
request.bind_session_to_user(user) | |
return HTTPSeeOther(next_url, headers=headers) | |
@view_config(...) | |
def logout_view(request): | |
# ... | |
next_url = request.route_url('home') | |
headers = forget(request) | |
# always invalidate a session when crossing privilege boundaries | |
request.session.invalidate() | |
return HTTPSeeOther(next_url, headers=headers) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .session_protector import ( | |
session_protector, | |
bind_session_to_user, | |
) | |
def includeme(config): | |
settings = config.get_settings() | |
session_factory = SignedCookieSessionFactory( | |
settings['session.secret'], | |
salt=settings['session.salt'], | |
cookie_name=settings['session.cookie_name'], | |
secure=asbool(settings.get('session.secure')), | |
httponly=True, | |
hashalg='sha256', | |
timeout=int(settings['session.timeout']), | |
reissue_time=int(settings['session.reissue_time']), | |
) | |
session_factory = session_protector(session_factory) | |
config.set_session_factory(session_factory) | |
config.add_request_method(bind_session_to_user) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
log = __import__('logging').getLogger(__name__) | |
SESSION_BINDING_TOKEN = '__uid' | |
_marker = object() | |
def bind_session_to_user(request, user, session=None, invalidate=True): | |
if session is None: | |
# avoid binding checks while performing a binding | |
# otherwise the session may inadvertently be invalidated/cleared | |
old_val = getattr(request, '_skip_session_binding_checks', _marker) | |
request._skip_session_binding_checks = True | |
session = request.session | |
if old_val is not _marker: | |
request._skip_session_binding_checks = old_val | |
else: | |
del request._skip_session_binding_checks | |
if invalidate: | |
log.debug('invalidating session prior to bind') | |
session.invalidate() | |
token = make_session_token(request, user) | |
log.info('binding session with token="%s"', token) | |
session[SESSION_BINDING_TOKEN_KEY] = token | |
def make_session_token(request, user=_marker): | |
uid = '' | |
if user is _marker: | |
user = request.user | |
if user is not None: | |
uid = 'u:{0}'.format(user.id) | |
return uid | |
def session_protector(factory): | |
@functools.wraps(factory) | |
def wrapper(request): | |
session = factory(request) | |
if getattr(request, '_skip_session_binding_checks', False): | |
log.debug('skipping automatic session binding') | |
return session | |
expected_token = make_session_token(request) | |
found_token = session.get(SESSION_BINDING_TOKEN_KEY, '') | |
invalid_session = False | |
if expected_token: | |
if found_token == expected_token: | |
log.debug('found expected session binding token="%s"', | |
expected_token) | |
elif found_token: | |
log.info('detected session for token="%s" but ' | |
'expected token="%s", invalidating session', | |
found_token, expected_token) | |
invalid_session = True | |
else: | |
log.info('unbound session, expected token="%s", ' | |
'invalidating session', expected_token) | |
invalid_session = True | |
elif found_token: | |
log.info('detected bound session for token="%s" but ' | |
'expected no token, invalidating session', | |
found_token) | |
invalid_session = True | |
else: | |
log.debug('found unbound session') | |
if invalid_session: | |
session.invalidate() | |
if expected_token: | |
log.info('binding new session with token="%s"', expected_token) | |
session[SESSION_BINDING_TOKEN_KEY] = expected_token | |
return session | |
return wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment