Skip to content

Instantly share code, notes, and snippets.

@alukach
Last active April 6, 2017 02:42
Show Gist options
  • Save alukach/a136b5c07d1b9d2bd8b36173ec53f656 to your computer and use it in GitHub Desktop.
Save alukach/a136b5c07d1b9d2bd8b36173ec53f656 to your computer and use it in GitHub Desktop.
A custom token authentication system for DRF.
#
# Token Manager
# https://docs.djangoproject.com/en/1.10/topics/signing/
#
from django.core import signing
from django.contrib.auth import get_user_model
class TemporaryApiToken():
"""
Class to manage temporary limited-access API tokens. Manages serialization
and deserialization of API permission payload to/from signed token.
user: User that will be authenticated by token.
endpoints: key:value pairs of HTTP methods and endpoint roots that
token is authorized to access. The following values would authorize
the token to make GET requests to any endpoints that begin with
'/api/v1/foo':
{'GET': ['/api/v1/foo']}
NOTE: This this token will not override any existing permissions
for its associatted User within the system. It only adds further
restrictions to the endpoints that can be accessed.
max_age: How long, in seconds, the token will be valid. By default,
tokens will be valid for 1 hour. Non-expiring tokens are not
supported.
recipient: (Optional) A textual description of the recipient for which
this token was intended. No validation is done with this data,
however it is appended to the request as a 'X-API-Token-Recipient'
header by the accompanying DRF authentication scheme. This is for
tracking purposes.
"""
SignatureExpired = signing.SignatureExpired
BadSignature = signing.BadSignature
def __init__(self, user, endpoints: dict, max_age: int=360, recipient: str=None):
self.user = user
self.endpoints = endpoints
self.max_age = max_age
self.recipient = recipient
self._validate()
def generate_signed_token(self):
unsigned_token = {
'user': self.user.id,
'max_age': self.max_age,
'endpoints': self.endpoints,
}
if self.recipient is not None:
unsigned_token['recipient'] = self.recipient
return signing.dumps(unsigned_token)
def authenticate(self, request):
for endpoint in self.endpoints.get(request.method, []):
if request.path.startswith(endpoint):
return (self.user, None)
raise ValueError(
'Endpoint interaction not permitted by token')
@classmethod
def from_signed_token(cls, signed_token):
""" Return instance from signed token """
unsigned_token = cls._parse_token(signed_token)
unsigned_token['user'] = cls._get_user(unsigned_token['user'])
cls._parse_token(signed_token, max_age=unsigned_token['max_age'])
return cls(**unsigned_token)
def _validate(self):
""" Ensure token has properly formatted attributes """
assert getattr(self.user, 'id', None), "user must have id attribute"
assert isinstance(self.max_age, int), "max_age must be an int"
assert isinstance(self.endpoints, dict), "endpoints must be a dict"
valid_methods = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'HEAD',
'OPTIONS')
for method, endpoints in self.endpoints.items():
assert method in valid_methods, \
"Unsupported method type: {}".format(method)
assert isinstance(endpoints, (list, tuple)), \
("Endpoints must be a list or tuple,"
" got {}".format(type(endpoints).__name__))
for endpoint in endpoints:
assert isinstance(endpoint, str), "Endpoints must be strings"
assert endpoint.startswith('/'), \
"Endpoints must begin with a slash"
@staticmethod
def _get_user(user_id):
User = get_user_model()
try:
return User.objects.get(id=user_id)
except User.DoesNotExist:
raise ValueError("No such user")
@staticmethod
def _parse_token(signed_token, max_age=None):
return signing.loads(signed_token, max_age=max_age)
#
# Authentication for DRF
# http://www.django-rest-framework.org/api-guide/authentication/#custom-authentication
#
from rest_framework import authentication
from rest_framework import exceptions
class ApiTokenAuthentication(authentication.BaseAuthentication):
"""
Authentication scheme to authenticate with a token located in the
'Authorization' header or as a 'TOKEN' query parameter.
"""
def authenticate(self, request):
signed_token = request.META.get('Authorization')
if signed_token:
try:
_, signed_token = signed_token.split('Token ')
except (IndexError, ValueError):
return
signed_token = signed_token or request.GET.get('AUTH_TOKEN')
if signed_token:
try:
token = TemporaryApiToken.from_signed_token(signed_token)
if token.recipient:
request.META['X-API-Token-Recipient'] = token.recipient
return token.authenticate(request)
except (AssertionError, ValueError) as e:
raise exceptions.AuthenticationFailed(e)
except TemporaryApiToken.SignatureExpired:
raise exceptions.AuthenticationFailed("Token has expired")
except TemporaryApiToken.BadSignature:
raise exceptions.AuthenticationFailed("Bad API token")
#
# Tests
#
from rest_framework.test import APIRequestFactory
factory = APIRequestFactory()
user = User.objects.first()
# Good Request
t = TemporaryApiToken(
user=user,
endpoints=dict(GET=['/bar'], POST=['/foo']),
max_age=10,
recipient='my-new-microservice'
)
request = factory.post(
'/foo/some-nested-endpoint/',
Authorization="Token " + t.generate_signed_token()
)
ApiTokenAuthentication().authenticate(request)
assert request.META.get('X-API-Token-Recipient') == "my-new-microservice"
print("POST request approved")
# Good Request w/ Query Arg
t = TemporaryApiToken(
user=user,
endpoints=dict(GET=['/foo', '/bar'], POST=['/foo']),
max_age=10,
recipient='my-new-microservice'
)
request = factory.get(
'/foo/some-nested-endpoint/',
data={'AUTH_TOKEN': t.generate_signed_token()}
)
ApiTokenAuthentication().authenticate(request)
assert request.META.get('X-API-Token-Recipient') == "my-new-microservice"
print("Query Arg request approved")
# Bad Path
request = factory.get(
'/secret',
Authorization="Token " + t.generate_signed_token()
)
try:
ApiTokenAuthentication().authenticate(request)
except Exception as e:
assert str(e) == "Endpoint interaction not permitted by token", \
"Wrong err: {}".format(e)
print("Request correctly rejected due to path")
else:
assert 0, "Token did not throw error!"
# Expired Token
t = TemporaryApiToken(
user=user,
endpoints=dict(GET=['/foo']),
max_age=0 # Immediately expired
)
request = factory.get(
'/foo/bar',
Authorization="Token " + t.generate_signed_token()
)
try:
ApiTokenAuthentication().authenticate(request)
except Exception as e:
assert str(e) == "Token has expired", "Wrong err: {}".format(e)
print("Request correctly rejected due to expiration")
else:
assert 0, "Token did not throw error!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment