Last active
April 24, 2024 14:06
-
-
Save lordsarcastic/a3681c314b6c00e96e10f10f60fe0ce8 to your computer and use it in GitHub Desktop.
This is a generic implementation of OTP for Django applications. It can be extended to any framework or platform that uses OTP.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This is a robust implementation that is extensible for anything that requires the use of OTP. Want to use OTP to verify | |
a user? Check! Want to use OTP to validate an order? CHeck! Want to use OTP to reset a password? CHEck! Want to use | |
OTP to verify a device? CHECk! Want to use OTP to fight people? CHECK! | |
I used: Django (framework), Django Rest Framework (a plugin for REST API), PostgreSQL (database) | |
and email (for sending the otp). You can substitute any of these for whatever you want, like using Redis instead of Postgres. | |
While this solution is built to be used for Django, I have added comments to explain the process for developers using | |
other frameworks. | |
Legend: | |
- otp.py: Base logic for implementing OTP functionality in applications | |
- user.py: User model used in OTP example | |
- utils.models.py: Contains base model for all models | |
- User verification process.py: An example using OTP to verify a user's account | |
- Reset password.py: An example using OTP to implement forgot password |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from random import randrange | |
from typing import Any, Dict, Optional, TypedDict, Union | |
from django.core import signing | |
from django.core.mail import EmailMultiAlternatives | |
from django.db import models | |
from django.utils import timezone | |
# a user-defined abstract model that includes `uuid`, `created` and `last_updated` | |
# to track the state of a model. UUID is the primary key of the model. | |
from utils.models import TrackObjectStateMixin | |
from .user import User | |
class CompleteOTPType(TypedDict): | |
instance: "AuthOTP" | |
code: int | |
class CreateUserAuthOTPType(TypedDict): | |
user_auth_otp: "UserAuthOTP" | |
complete_otp: CompleteOTPType | |
class AuthOTP(TrackObjectStateMixin): | |
# Number of seconds till the OTP expires | |
TIMEOUT = 5 * 60 | |
# this length should never go below 4 | |
OTP_LENGTH = 6 | |
# this is the actual code sent to the user. it is encrypted | |
# and signed and not stored as plain text for security purposes | |
code = models.CharField(max_length=128) | |
# we use a duration field to store the number of seconds | |
# the OTP expires from the time of creation, `created`. | |
timeout = models.DurationField(default=timezone.timedelta(seconds=TIMEOUT)) | |
def __str__(self): | |
return f"AuthOTP instance at time: {self.created}" | |
@classmethod | |
def generate_otp(cls, **extra_fields) -> CompleteOTPType: | |
""" | |
The idea behind the generation of an otp or anything secret is simple. | |
First, we generate a random code: the otp itself. | |
This value is only visible to the user and cannot be inspected in | |
any way. To ensure that this process is reversible for verifying | |
OTPs, we do this: | |
- Use the uuid of the AuthOTP instance as a salt to guarantee | |
randomness | |
- Sign the generated code and strip the code out of the output | |
- Then we store the result from above as the `instance.code`. | |
""" | |
otp: cls = cls(**extra_fields) | |
# we know this will always be the length `cls.OTP_LENGTH` digits | |
# so we can slice this out after signing | |
code = randrange( | |
10 ** (cls.OTP_LENGTH - 1), (10**cls.OTP_LENGTH) - 1 | |
) | |
# you can swap this for any encryption format you want. I want to | |
# stick to core Django | |
signed_code = signing.TimestampSigner(salt=str(otp.uuid)).sign(code) | |
# we strip out the characters of the length `cls.OTP_LENGTH` | |
# this is only because the sigining algorithm from above | |
# adds the original code generated to the output. Of course we | |
# don't want that to be available in plain sight :D | |
otp.code = signed_code[cls.OTP_LENGTH :] | |
otp.save() | |
# this is the only point you can ever access the actual OTP code | |
# If the output is not saved in a variable, it is gone forever | |
# and ever. and ever. and ever. | |
return {"instance": otp, "code": code} | |
@classmethod | |
def verify_otp(cls, otp_instance: "AuthOTP", code: int) -> bool: | |
""" | |
Verification only requires that user can provide the first | |
4 values of `instance.code` which is the OTP sent to the user's mail. | |
Without the OTP, it is impossible to verify the OTP even from the | |
stored instance. This guarantees security and places it in the | |
hands of the user. | |
""" | |
try: | |
unsigned_code = int( | |
signing.TimestampSigner(salt=(str(otp_instance.uuid))).unsign( | |
f"{code}{otp_instance.code}", max_age=otp_instance.timeout | |
) | |
) | |
except (signing.BadSignature, signing.SignatureExpired, IndexError): | |
return False | |
return unsigned_code == code | |
def has_expired(self) -> bool: | |
# we keep this for jobs to have an API accessible to clear | |
# expired otps. | |
return self.created + self.timeout < timezone.now() | |
class UserAuthOTP(models.Model): | |
""" | |
An intermediary table to bind users and otps as otps are meant to be | |
anonymous. This table handles management of OTPs for various reasons | |
throughout the codebase. Want to generate OTPs to confirm an order? | |
Simply add it to the `OTPReasons` enum. | |
""" | |
# we expect this table to be used for future OTPs | |
# in various respect. All the developer has to do | |
# is add a new `Reason` and do as seen fit. | |
# for non Django devs: the chars in all caps are | |
# the actual text stored in the DB. | |
class OTPReasons(models.TextChoices): | |
ACTIVATE_USER = "ACTIVATE_USER", "Activate User" | |
FORGOT_PASSWORD = "FORGOT_PASSWORD", "Forgot password" | |
# nothing special, the line was simply too long | |
FORGOT_PASSWORD_TOKEN = ( | |
"FORGOT_PASSWORD_TOKEN", | |
"Forgot password token", | |
) | |
user = models.ForeignKey(User, on_delete=models.CASCADE) | |
otp = models.ForeignKey(AuthOTP, on_delete=models.CASCADE) | |
# a way to group OTPs into malleable categories | |
reason = models.CharField( | |
max_length=64, | |
choices=OTPReasons.choices, | |
default=OTPReasons.ACTIVATE_USER, | |
) | |
def __str__(self) -> str: | |
return f"{self.user} | {self.reason}" | |
class Meta: | |
# we add a composite index made up of the User table and the | |
# specified reason. This means that only one `UserAuthOTP` | |
# instance can exist for a user per reason. Meanwhile, multiple | |
# otps may exist with only 0 or 1 valid at any point in time | |
# for a particular `reason`. | |
constraints = [ | |
models.UniqueConstraint( | |
fields=["reason", "user"], name="unique_reason_for_user" | |
) | |
] | |
@classmethod | |
def create_otp_for_user( | |
cls, user: User, reason: OTPReasons | |
) -> CreateUserAuthOTPType: | |
""" | |
This is the expected method to be used to create an OTP | |
for a user. It yields both the auth_otp instance and | |
the generated code. Again, the generated code will only be | |
available here. If it isn't stored in a variable, it is | |
lost forever. | |
""" | |
otp = AuthOTP.generate_otp() | |
try: | |
# if a `UserAuthOTP` instance already exists for the reason, | |
# we reuse it. | |
user_auth_otp = cls.objects.get(user=user, reason=reason) | |
except cls.DoesNotExist: | |
# if not, we create one for the reason | |
user_auth_otp = UserAuthOTP(user=user, reason=reason) | |
user_auth_otp.otp = otp["instance"] | |
user_auth_otp.save() | |
return {"user_auth_otp": user_auth_otp, "complete_otp": otp} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
The process of resetting password password is in three stages: | |
1. Initialize password reset: At this stage, the OTP is created and sent to the email address of the user specified if | |
the user exists in the database. The OTP sent is of type "FORGOT_PASSWORD". | |
2. Receive OTP from user: In this stage, the client sends the OTP typed in by the user for verification. The OTP code, reason | |
and user are all validated to be correct: | |
- The otp must be correct | |
- The otp must not have expired | |
- The otp must belong to the user | |
- The otp must be for the right reasons | |
The server creates another OTP (basically a temporary token) to store the state of this password reset. This token expires | |
really quickly and is hidden from the user. The client is to submit this token with the new password of the user. | |
The reason for this process is: | |
- To maintain a form of state for the password reset process | |
- To ensure the client has a way of immediately verifying the OTP sent by the user | |
- To ensure security by invalidating existing password reset processes by malicious users. If during stage 2, another password | |
reset process is also in stage 2, the most recent lives and the oldest is invalidated. | |
3. Complete password reset: The token from stage 2 is sent by the client alongside the new password. The token is validated | |
also based on the criteria in stage 2. The user's password is reset at this point. | |
""" | |
# stage 1: | |
from datetime import timedelta | |
from typing import Dict, Optional | |
from rest_framework import serializers, generics, response, status, views | |
from .models import AuthOTP, User, UserAuthOTP | |
def convert_duration_to_minutes(duration: timedelta) -> int: | |
return int((duration.total_seconds() / 5)) | |
class InitializePasswordResetSerializer(serializers.Serializer): | |
""" | |
Serializer handling stage one of the process | |
""" | |
EMAIL_SUBJECT = "Reset your account password" | |
EMAIL_MESSAGE = ( | |
"Use this code to reset your account password:\n" | |
"{}\n" | |
"This code will expire in {} minutes." | |
) | |
EMAIL_SENDER = "App <[email protected]>" | |
email = serializers.EmailField() | |
def validate_email(self, value) -> Optional[User]: | |
try: | |
user = User.objects.get(email=value) | |
except User.DoesNotExist: | |
return | |
return user | |
def save(self): | |
user: Optional[User] = self.validated_data["email"] | |
if not user: | |
return | |
user_auth_otp = UserAuthOTP.create_otp_for_user( | |
user, UserAuthOTP.OTPReasons.FORGOT_PASSWORD | |
) | |
user.email_user( | |
self.EMAIL_SUBJECT, | |
self.EMAIL_MESSAGE.format( | |
user_auth_otp["complete_otp"]["code"], | |
convert_duration_to_minutes( | |
user_auth_otp["complete_otp"]["instance"].timeout | |
), | |
), | |
self.EMAIL_SENDER, | |
) | |
class InitializePasswordResetAPIView(generics.GenericAPIView): | |
""" | |
Endpoint handling stage one of the process | |
""" | |
serializer_class = InitializePasswordResetSerializer | |
def post(self, request, *args, **kwargs): | |
serializer = self.serializer_class(data=request.data) | |
serializer.is_valid(raise_exception=True) | |
serializer.save() | |
return response.Response(status=status.HTTP_204_NO_CONTENT) | |
# stage 2 | |
class ReceiveOTPForPasswordResetSerializer(serializers.Serializer): | |
""" | |
Serializer handling stage two of the process | |
""" | |
ERROR_MESSAGE = "OTP is invalid" | |
email = serializers.EmailField(write_only=True) | |
code = serializers.IntegerField(read_only=True) | |
otp = serializers.IntegerField(write_only=True) | |
def validate(self, attrs): | |
verified = False | |
try: | |
user: User = User.objects.get(email=attrs["email"]) | |
# we're setting context here only because | |
# we want to raise this error inside the serializer | |
self.context["user"] = user | |
user_auth_otp = UserAuthOTP.objects.get( | |
user=user, reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD | |
) | |
verified = AuthOTP.verify_otp(user_auth_otp.otp, int(attrs["otp"])) | |
except (User.DoesNotExist, UserAuthOTP.DoesNotExist): | |
raise serializers.ValidationError(self.ERROR_MESSAGE) | |
if not verified: | |
raise serializers.ValidationError(self.ERROR_MESSAGE) | |
return attrs | |
@transaction.atomic | |
def save(self): | |
user_auth_otp = UserAuthOTP.create_otp_for_user( | |
self.context["user"], UserAuthOTP.OTPReasons.FORGOT_PASSWORD_TOKEN | |
) | |
UserAuthOTP.objects.get( | |
user=self.context["user"], | |
reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD, | |
).delete() | |
return {"code": user_auth_otp["complete_otp"]["code"]} | |
class ReceiveOTPForPasswordResetAPIView(generics.CreateAPIView): | |
""" | |
Endpoint handling stage two of the process | |
""" | |
serializer_class = ReceiveOTPForPasswordResetSerializer | |
def post(self, request, *args, **kwargs): | |
serializer = self.serializer_class(data=request.data) | |
serializer.is_valid(raise_exception=True) | |
result = serializer.save() | |
return response.Response( | |
result, | |
status=status.HTTP_200_OK, | |
) | |
# stage 3 | |
class CompletePasswordResetSerializer(serializers.Serializer): | |
""" | |
Serializer handling stage three of the process | |
""" | |
ERROR_MESSAGE = "Password reset request invalid" | |
code = serializers.IntegerField(write_only=True) | |
email = serializers.EmailField() | |
password = serializers.CharField(min_length=8, write_only=True) | |
def validate(self, attrs): | |
verified = False | |
try: | |
user: User = User.objects.get(email=attrs["email"]) | |
# we're setting context here only because | |
# we want to raise this error inside the serializer | |
self.context["user"] = user | |
user_auth_otp = UserAuthOTP.objects.get( | |
user=user, reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD_TOKEN | |
) | |
verified = AuthOTP.verify_otp(user_auth_otp.otp, attrs["code"]) | |
except (User.DoesNotExist, UserAuthOTP.DoesNotExist) as e: | |
raise serializers.ValidationError(self.ERROR_MESSAGE) | |
if not verified: | |
raise serializers.ValidationError(self.ERROR_MESSAGE) | |
return attrs | |
def save(self): | |
user = self.context["user"] | |
user.set_password(self.validated_data["password"]) | |
user.save() | |
user_auth_otp = UserAuthOTP.objects.get( | |
user=user, reason=UserAuthOTP.OTPReasons.FORGOT_PASSWORD_TOKEN | |
) | |
user_auth_otp.delete() | |
class CompletePasswordResetAPIView(generics.GenericAPIView): | |
""" | |
Endpoint handling stage three of the process | |
""" | |
serializer_class = CompletePasswordResetSerializer | |
def post(self, request, *args, **kwargs): | |
serializer = self.serializer_class(data=request.data) | |
serializer.is_valid(raise_exception=True) | |
serializer.save() | |
return response.Response( | |
{"message": "Password reset successfully"}, | |
status=status.HTTP_200_OK, | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Two APIs are called to verify a user: | |
1. The `InitializeActivateUserAPIView` is first called to send OTP to the user's | |
email address (phone numbers can be used with Twilio too). This endpoint handles checks on whether the user | |
has been verified before or not. It returns a 204 response if the OTP is sent and a 403 if the user is already verified. | |
2. The `CompleteUserVerificationProcessAPIView` is used to verify the code sent by the user through the client and | |
activates the user if everything is right: | |
- The otp sent is correct | |
- The otp hasn't expired | |
- The otp is for the right reason | |
""" | |
from typing import Any, Dict, Optional | |
from django.http import HttpRequest, HttpResponsePermanentRedirect | |
from django.shortcuts import redirect | |
from rest_framework import generics, response, status, views | |
from rest_framework.permissions import IsAuthenticated | |
from .models import AuthOTP, UserAuthOTP, User | |
class InitializeActivateUserAPIView(generics.CreateAPIView): | |
""" | |
This is the first endpoint to be called. | |
""" | |
EMAIL_SUBJECT = "Activate your account" | |
EMAIL_MESSAGE = ( | |
"Use this code to activate your account:\n" | |
"{}\n" | |
"This code will expire in {} minutes." | |
) | |
EMAIL_SENDER = "App <[email protected]>" | |
permission_classes = [IsAuthenticated] | |
def process_verification_initialization(self): | |
user: User = self.request.user | |
if user.is_verified: | |
return False | |
# here, we simply create an OTP for activating the user | |
# using the functionalities in the otp module. If there is | |
# an existing OTP that hasn't been used and is still live, | |
# it is detached and there by invalidated here. | |
user_auth_otp = UserAuthOTP.create_otp_for_user( | |
self.request.user, UserAuthOTP.OTPReasons.ACTIVATE_USER | |
) | |
# the `AbstractUser` which the `User` model inherits | |
# implements methods like `email_user` which can be used | |
# to send emails to the user. | |
# this method call may not run due to some unforseen circumstances | |
# from mailgun or ourselves. We are not catching this exception | |
# so we can investigate it in case it happens. | |
user.email_user( | |
self.EMAIL_SUBJECT, | |
self.EMAIL_MESSAGE.format( | |
user_auth_otp["complete_otp"]["code"], | |
convert_duration_to_minutes( | |
user_auth_otp["complete_otp"]["instance"].timeout | |
), | |
), | |
self.EMAIL_SENDER, | |
) | |
return True | |
def post(self, request, *args, **kwargs): | |
if self.process_verification_initialization(): | |
return response.Response(status=status.HTTP_204_NO_CONTENT) | |
return response.Response( | |
data={"details": "User already verified"}, | |
status=status.HTTP_403_FORBIDDEN, | |
) | |
from rest_framework import status | |
from rest_framework.exceptions import APIException | |
class InvalidOTP(APIException): | |
""" | |
A custom exception wrapper over the 422 status code | |
""" | |
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY | |
default_detail = ( | |
"This OTP is either: invalid, already used or already expired." | |
) | |
default_code = "wrong_otp" | |
from typing import Dict, Optional | |
from django.db import transaction | |
from rest_framework import serializers | |
from .models import AuthOTP, User, UserAuthOTP | |
class CompleteVerificationProcessSerializer(serializers.Serializer): | |
""" | |
This is simply a class that serializes input from the client and processes | |
the data to perform actions on behalf of the endpoint. | |
""" | |
code = serializers.IntegerField() | |
def validate_code(self, value: int): | |
try: | |
# first we look for the `UserAuthOTP` in the database | |
user_auth_otp = UserAuthOTP.objects.get( | |
user=self.context["request"].user, | |
reason=UserAuthOTP.OTPReasons.ACTIVATE_USER, | |
) | |
except UserAuthOTP.DoesNotExist: | |
# we can't find it! | |
raise InvalidOTP | |
# next we verify if the code submitted by the user is | |
# correct | |
if not AuthOTP.verify_otp(user_auth_otp.otp, value): | |
raise exceptions.InvalidOTP | |
return user_auth_otp | |
# atomic so it rollsback in case something goes wrong | |
@transaction.atomic | |
def save(self): | |
user: User = self.context["request"].user | |
user.is_verified = True | |
user.save() | |
self.validated_data["code"].delete() | |
return | |
class CompleteUserVerificationProcessAPIView(generics.CreateAPIView): | |
""" | |
This endpoint relies on the serializer above to do the main work | |
""" | |
permission_classes = [IsAuthenticated] | |
serializer_class = CompleteVerificationProcessSerializer | |
def post(self, request, *args, **kwargs): | |
serializer = self.serializer_class( | |
data=request.data, context={"request": request} | |
) | |
# this line runs the code in `validate_code` for us | |
serializer.is_valid(raise_exception=True) | |
serializer.save() | |
return response.Response( | |
{"message": "User successfully verified"}, | |
status=status.HTTP_200_OK, | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.contrib.auth.models import AbstractUser | |
from django.conf import settings | |
from django.db import models | |
from utils.models import TrackObjectStateMixin | |
class User(AbstractUser, TrackObjectStateMixin): | |
email = models.EmailField(unique=True) | |
is_verified = models.BooleanField(default=False, index=True) | |
USERNAME_FIELD = "email" | |
REQUIRED_FIELDS = ["username"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
from django.db import models | |
class TrackObjectStateMixin(models.Model): | |
uuid = models.UUIDField(default=uuid.uuid4, unique=True) | |
created = models.DateTimeField(auto_now_add=True) | |
last_updated = models.DateTimeField(auto_now=True) | |
class Meta: | |
abstract = True |
Very robust implementation chief, well done.
Thank you 🔥
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Very robust implementation chief, well done.