Last active
August 29, 2015 14:11
-
-
Save jarshwah/c5b9abebb452f2e3286f to your computer and use it in GitHub Desktop.
Django Custom Auth Backend - if refactored
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import hashlib | |
import hmac | |
import requests | |
import time | |
from django.contrib.auth import get_user_model | |
from django.conf import settings | |
from django.core.cache import cache | |
from django.utils.http import unquote | |
from .models import Tenant, Task, UserTask | |
class APIException(Exception): | |
pass | |
class APIAuth(requests.auth.AuthBase): | |
def __call__(self, prepared): | |
prepared.prepare_url(prepared.url, dict( | |
api_key=settings.API_KEY, | |
timestamp=int(time.time()*1000))) | |
signature = base64.b64encode(hmac.new( | |
settings.SHARED_SECRET.encode(), | |
msg=unquote(prepared.url).encode(), | |
digestmod=hashlib.sha256 | |
).digest()).decode() | |
prepared.headers['Authorization'] = "signature {0}".format(signature) | |
return prepared | |
class APIBackend(object): | |
def __init__(self): | |
self.auth = APIAuth() | |
self.api_key = settings.API_KEY | |
self.gen_user = settings.API_USER | |
self.gen_pw = settings.API_PASS | |
self.cache_key = 'api_session_key' | |
self.urls = { | |
method : settings.API_URL + method | |
for method in ( | |
'login', 'authenticate', 'change_password', 'get_roles')} | |
def authenticate(self, username=None, password=None, **kwargs): | |
resp = self._api_authenticate(username, password) | |
if resp.ok: | |
r = resp.json().get('Result') | |
User = get_user_model() | |
try: | |
user = User._default_manager.get_by_natural_key(username) | |
except User.DoesNotExist: | |
user = User( | |
username=username, | |
first_name=r.get('firstName', 'Unknown'), | |
last_name=r.get('lastName', 'Unknown'), | |
email=r.get('email', '[email protected]'), | |
) | |
user.save() | |
user.set_unusable_password() | |
self._cache_permissions(user) | |
return user | |
return None | |
def get_user(self, user_id): | |
User = get_user_model() | |
try: | |
return User._default_manager.get(pk=user_id) | |
except User.DoesNotExist: | |
return None | |
def get_all_permissions(self, user_obj, obj=None): | |
if not hasattr(user_obj, '_api_perm_cache'): | |
user_tasks = UserTask.objects.select_related( | |
'user', 'task').filter(user=user_obj) | |
perms = set("%s.%s" % (ut.task.module, ut.task.name) | |
for ut in user_tasks) | |
user_obj._api_perm_cache = perms | |
return user_obj._api_perm_cache | |
def has_perm(self, user_obj, perm, obj=None): | |
return perm in self.get_all_permissions(user_obj, obj) | |
def has_module_perm(self, user_obj, module): | |
return any( | |
perm for perm in self.get_all_permissions(user_obj) | |
if perm[:perm.index('.')] == module) | |
def change_password(self, user_obj, current_password, new_password): | |
resp = self._api_change_password(user_obj.username, current_password, new_password) | |
return resp.ok | |
@property | |
def api_session_id(self): | |
if not hasattr(self, '_api_session_id'): | |
sid = cache.get(self.cache_key) | |
if sid is None: | |
self._api_login() | |
sid = cache.get(self.cache_key) | |
self._api_session_id = sid | |
return self._api_session_id | |
def _api_login(self): | |
url = self.urls['login'] | |
post_data = {'username': self.gen_user, 'password': self.gen_pw} | |
resp = requests.post(url, data=post_data, auth=self.auth) | |
if resp.ok: | |
session_id = resp.json().get('Result').get('sessionId') | |
cache.set(self.cache_key, session_id, timeout=60*60*24*1) # 1 day | |
self._api_session_id = session_id | |
return resp | |
raise APIException( | |
"Could not login to API. Status Code %s" % resp.status_code) | |
def _api_authenticate(self, username, password): | |
url = self.urls['authenticate'] | |
get_data = {'sessionId': self.api_session_id} | |
post_data = {'username': username, 'password': password} | |
resp = requests.post( | |
url, | |
params=get_data, | |
data=post_data, | |
auth=self.auth) | |
return resp | |
def _api_get_roles(self, username): | |
url = self.urls['get_roles'] | |
get_data = {'username': username, 'sessionId': self.api_session_id} | |
resp = requests.get(url, params=get_data, auth=self.auth) | |
return resp | |
def _cache_permissions(self, user): | |
pass # removed for brevity | |
def _api_change_password(self, username, old_password, new_password): | |
url = self.urls['change_password'] | |
post_data = { | |
'username': username, | |
'oldpassword': old_password, | |
'newpassword': new_password | |
} | |
get_data = {'sessionId': self.api_session_id} | |
resp = requests.post(url, params=get_data, data=post_data, auth=self.auth) | |
return resp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import OrderedDict | |
from django.contrib.auth.forms import PasswordChangeForm | |
from crispy_forms.helper import FormHelper | |
class APIPasswordChangeForm(PasswordChangeForm): | |
# this is now only needed for styling purposes | |
helper = FormHelper() | |
helper.form_tag = False | |
helper.label_class = 'col-md-5' | |
helper.field_class = 'col-md-7' | |
def __init__(self, *args, **kwargs): | |
user = kwargs.pop('user') | |
super(APIPasswordChangeForm, self).__init__(user, *args, **kwargs) | |
# ensures correct ordering of form fields | |
APIPasswordChangeForm.base_fields = OrderedDict( | |
(k, APIPasswordChangeForm.base_fields[k]) | |
for k in ['old_password', 'new_password1', 'new_password2'] | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.core.urlresolvers import reverse_lazy | |
from django.views.generic.edit import FormView | |
from braces.views import LoginRequiredMixin, UserFormKwargsMixin | |
from .forms import APIPasswordChangeForm | |
class ChangePasswordFormView(UserFormKwargsMixin, FormView): | |
template_name = 'api/change_password.html' | |
form_class = APIPasswordChangeForm | |
success_url = reverse_lazy('api:change_password_success') | |
def form_valid(self, form): | |
form.save() | |
return super(ChangePasswordFormView, self).form_valid(form) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment