Last active
October 5, 2022 12:12
-
-
Save gajanan0707/fefe696aed5b2b2fd3542e802216f0c8 to your computer and use it in GitHub Desktop.
FLASK_DEMO_REST/user/service,py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import jwt | |
import datetime | |
from server import db | |
from os import environ | |
from users.helper import send_forgot_password_email | |
from users.models import User | |
from flask_bcrypt import generate_password_hash | |
from utils.common import generate_response, TokenGenerator | |
from users.validation import ( | |
CreateLoginInputSchema, | |
CreateResetPasswordEmailSendInputSchema, | |
CreateSignupInputSchema, ResetPasswordInputSchema, | |
) | |
from utils.http_code import HTTP_200_OK, HTTP_201_CREATED, HTTP_400_BAD_REQUEST | |
def create_user(request, input_data): | |
""" | |
It creates a new user | |
:param request: The request object | |
:param input_data: This is the data that is passed to the function | |
:return: A response object | |
""" | |
create_validation_schema = CreateSignupInputSchema() | |
errors = create_validation_schema.validate(input_data) | |
if errors: | |
return generate_response(message=errors) | |
check_username_exist = User.query.filter_by( | |
username=input_data.get("username") | |
).first() | |
check_email_exist = User.query.filter_by(email=input_data.get("email")).first() | |
if check_username_exist: | |
return generate_response( | |
message="Username already exist", status=HTTP_400_BAD_REQUEST | |
) | |
elif check_email_exist: | |
return generate_response( | |
message="Email already taken", status=HTTP_400_BAD_REQUEST | |
) | |
new_user = User(**input_data) # Create an instance of the User class | |
new_user.hash_password() | |
db.session.add(new_user) # Adds new User record to database | |
db.session.commit() # Comment | |
del input_data["password"] | |
return generate_response( | |
data=input_data, message="User Created", status=HTTP_201_CREATED | |
) | |
def login_user(request, input_data): | |
""" | |
It takes in a request and input data, validates the input data, checks if the user exists, checks if | |
the password is correct, and returns a response | |
:param request: The request object | |
:param input_data: The data that is passed to the function | |
:return: A dictionary with the keys: data, message, status | |
""" | |
create_validation_schema = CreateLoginInputSchema() | |
errors = create_validation_schema.validate(input_data) | |
if errors: | |
return generate_response(message=errors) | |
get_user = User.query.filter_by(email=input_data.get("email")).first() | |
if get_user is None: | |
return generate_response(message="User not found", status=HTTP_400_BAD_REQUEST) | |
if get_user.check_password(input_data.get("password")): | |
token = jwt.encode( | |
{ | |
"id": get_user.id, | |
"email": get_user.email, | |
"username": get_user.username, | |
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=30), | |
}, | |
environ.get("SECRET_KEY"), | |
) | |
input_data["token"] = token | |
return generate_response( | |
data=input_data, message="User login successfully", status=HTTP_201_CREATED | |
) | |
else: | |
return generate_response( | |
message="Password is wrong", status=HTTP_400_BAD_REQUEST | |
) | |
def reset_password_email_send(request, input_data): | |
""" | |
It takes an email address as input, checks if the email address is registered in the database, and | |
if it is, sends a password reset email to that address | |
:param request: The request object | |
:param input_data: The data that is passed to the function | |
:return: A response object with a message and status code. | |
""" | |
create_validation_schema = CreateResetPasswordEmailSendInputSchema() | |
errors = create_validation_schema.validate(input_data) | |
if errors: | |
return generate_response(message=errors) | |
user = User.query.filter_by(email=input_data.get("email")).first() | |
if user is None: | |
return generate_response( | |
message="No record found with this email. please signup first.", | |
status=HTTP_400_BAD_REQUEST, | |
) | |
send_forgot_password_email(request, user) | |
return generate_response( | |
message="Link sent to the registered email address.", status=HTTP_200_OK | |
) | |
def reset_password(request, input_data, token): | |
create_validation_schema = ResetPasswordInputSchema() | |
errors = create_validation_schema.validate(input_data) | |
if errors: | |
return generate_response(message=errors) | |
if not token: | |
return generate_response( | |
message="Token is required!", | |
status=HTTP_400_BAD_REQUEST, | |
) | |
token = TokenGenerator.decode_token(token) | |
user = User.query.filter_by(id=token.get('id')).first() | |
if user is None: | |
return generate_response( | |
message="No record found with this email. please signup first.", | |
status=HTTP_400_BAD_REQUEST, | |
) | |
user = User.query.filter_by(id=token['id']).first() | |
user.password = generate_password_hash(input_data.get('password')).decode("utf8") | |
db.session.commit() | |
return generate_response( | |
message="New password SuccessFully set.", status=HTTP_200_OK | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment