Example of authentication mutations and user queries in GraphQL API.
Last active
February 8, 2020 14:37
-
-
Save tricoder42/3725703a31ee4b37f1d01f6221e96c41 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
First we need to create a type from our database models (Django in this case) | |
""" | |
import graphene | |
from graphene_django import DjangoObjectType | |
from ..models import User | |
class UserType(DjangoObjectType): | |
full_name = graphene.String() | |
photo = graphene.String() | |
class Meta: | |
model = User | |
only_fields = [ | |
'id', 'key', | |
'first_name', 'last_name', 'full_name', | |
'phone_number', 'mobile_number', 'email', 'photo', 'role', | |
'is_active', 'last_login' | |
] | |
@staticmethod | |
def resolve_photo(obj, info): | |
if not obj.photo: | |
return None | |
return obj.photo.url | |
@staticmethod | |
def resolve_full_name(obj, info): | |
return obj.get_full_name() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Queries are like views in REST framework - they return a list of UserTypes (list of all users) | |
or a single UserType (e.g. currently authenticated user) | |
""" | |
import graphene | |
from django.contrib.auth import get_user_model | |
from .decorators import login_required | |
from .types import UserType | |
User = get_user_model() | |
class AuthUserQuery: | |
""" | |
Currently authenticated user or None if user is anonymous | |
""" | |
auth_user = graphene.Field(UserType) | |
@classmethod | |
def resolve_auth_user(cls, obj, info): | |
request = info.context | |
if not request or not request.user or not request.user.is_authenticated: | |
return None | |
return request.user | |
class UserQuery: | |
users = graphene.List(UserType, project=graphene.String()) | |
@classmethod | |
@login_required | |
def resolve_users(cls, obj, info): | |
return User.objects.all() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example mutations used for authentication | |
""" | |
import smtplib | |
import graphene | |
from django.conf import settings | |
from django.contrib.auth import authenticate, login, logout | |
from django.core.mail import send_mail | |
from django.db import IntegrityError | |
from .models import User | |
from .decorators import login_required | |
from .mutations import BaseMutation | |
from .types import UserType | |
class Login(BaseMutation): | |
class Arguments: | |
email = graphene.String() | |
password = graphene.String() | |
keep = graphene.Boolean() | |
user = graphene.Field(UserType) | |
@classmethod | |
def mutate(cls, obj, info, *, email, password, keep=False): | |
request = info.context | |
user = authenticate(request, email=email, password=password) | |
if user is None: | |
return cls(user=None, ok=False, error='Invalid email or password') | |
if not keep: | |
request.session.set_expiry(0) | |
login(request, user) | |
return cls(user=user) | |
class Logout(BaseMutation): | |
@classmethod | |
@login_required | |
def mutate(cls, obj, info): | |
request = info.context | |
logout(request) | |
return cls() | |
class Register(BaseMutation): | |
class Arguments: | |
email = graphene.String() | |
password = graphene.String() | |
user = graphene.Field(UserType) | |
@classmethod | |
def mutate(cls, obj, info, *, email, password): | |
email = email.strip() | |
try: | |
user = User.objects.create_user( | |
email=email, password=password) | |
except IntegrityError: | |
return cls(user=None, ok=False, error='Email already registered') | |
return cls(user=user) | |
class PasswordResetRequest(BaseMutation): | |
class Arguments: | |
email = graphene.String() | |
token = graphene.String() | |
@staticmethod | |
def send_mail(email, token): | |
msg = ('You\'ve requested password reset at https://{domain_web}/. ' | |
'Please visit https://{domain_web}/password/new/?verify={token}' | |
' and set your new password.' | |
).format(domain_web=settings.DOMAIN_WEB, token=token) | |
send_mail( | |
'Password reset verification', | |
msg, | |
settings.DEFAULT_FROM_EMAIL, | |
[email], | |
) | |
@classmethod | |
def mutate(cls, obj, info, *, email): | |
token = User.objects.create_reset_password_token(email) | |
if not token: | |
return cls(ok=False, error='Invalid email', token=None) | |
try: | |
PasswordResetRequest.send_mail(email, token) | |
except smtplib.SMTPException: | |
return cls(ok=False, error='Email not sent') | |
return cls(token=token) | |
class PasswordReset(BaseMutation): | |
class Arguments: | |
token = graphene.String() | |
password = graphene.String() | |
@staticmethod | |
def send_mail(email): | |
msg = ('Your password has changed. ' | |
'Please visit https://{domain_web}/ and verify.' | |
).format(domain_web=settings.DOMAIN_WEB) | |
send_mail( | |
'Password reset confirmation', | |
msg, | |
settings.DEFAULT_FROM_EMAIL, | |
[email], | |
) | |
@classmethod | |
def mutate(cls, obj, info, *, token, password): | |
user = User.objects.verify_reset_password_token(token) | |
if not user: | |
return cls(ok=False, error='Invalid token') | |
try: | |
PasswordReset.send_mail(user.email) | |
except smtplib.SMTPException: | |
# Token is already invalidated at this point, but email with | |
# confirmation hasn't been sent. Don't change password silently, | |
# rather request user to generate new token. | |
return cls(ok=False, error='Invalid token') | |
user.set_password(password) | |
user.save(update_fields=['password']) | |
return cls() | |
class AdminPasswordChange(BaseMutation): | |
""" | |
Change password for *any* user in database. Requires admin privileges. | |
""" | |
class Arguments: | |
email = graphene.String() | |
password = graphene.String() | |
@classmethod | |
def mutate(cls, obj, info, *, email, password): | |
if not info.context.user.is_admin: | |
return cls(ok=False, error='Not authorized') | |
user = User.objects.get(email__iexact=email) | |
user.set_password(password) | |
user.save(update_fields=['password']) | |
return cls() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
finally, we define the schema by listing queries and mutations (types are embedded automatically) | |
""" | |
from .queries import AuthUserQuery, UserQuery | |
from .mutations import ( | |
Login, Logout, Register, PasswordResetRequest, PasswordReset, | |
AdminPasswordChange | |
) | |
class Query(AuthUserQuery, UserQuery): | |
pass | |
class Mutations: | |
auth_login = Login.Field() | |
auth_logout = Logout.Field() | |
auth_register = Register.Field() | |
auth_password_reset_request = PasswordResetRequest.Field() | |
auth_password_reset = PasswordReset.Field() | |
auth_password_change = AdminPasswordChange.Field() | |
schema = graphene.Schema( | |
query=Query, | |
mutation=Mutations) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import graphene | |
def authorization_required(auth_func): | |
""" | |
Factory for authorization decorators. Pass `auth_func` which returns when | |
authorized to perform action, False otherwise. `auth_func` receives | |
request and current object as a param. | |
""" | |
def decorator(function): | |
def wrapper(cls, obj, info, **kwargs): | |
if not auth_func(info.context, obj): | |
if issubclass(cls, graphene.Mutation): | |
return cls(ok=False, error='Not authorized') | |
else: | |
return None | |
return function(cls, obj, info, **kwargs) | |
return wrapper | |
return decorator | |
def is_authenticated(request, obj): | |
return request and request.user and request.user.is_authenticated | |
login_required = authorization_required(is_authenticated) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment