Created
May 8, 2020 20:06
-
-
Save mjlavin80/c3784628b863791fc71ca5a8e4dd8c09 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import wraps | |
import requests, json, datetime | |
from time import time | |
from flask import Flask, request | |
from flask_restplus import Resource, Api, abort, fields, inputs, reqparse | |
from itsdangerous import SignatureExpired, JSONWebSignatureSerializer, BadSignature | |
from flask_sqlalchemy import SQLAlchemy | |
class AuthenticationToken: | |
def __init__(self, secret_key, expires_in): | |
self.secret_key = secret_key | |
self.expires_in = expires_in | |
self.serializer = JSONWebSignatureSerializer(secret_key) | |
def generate_token(self, username): | |
info = { | |
'username': username, | |
'creation_time': time() | |
} | |
token = self.serializer.dumps(info) | |
return token.decode() | |
def validate_token(self, token): | |
info = self.serializer.loads(token.encode()) | |
if time() - info['creation_time'] > self.expires_in: | |
raise SignatureExpired("The Token has been expired; get a new token") | |
return info['username'] | |
SECRET_KEY = "f4b58245-6fd4-4bce-a8a4-27ca37370a3c" | |
expires_in = 600 | |
auth = AuthenticationToken(SECRET_KEY, expires_in) | |
app = Flask(__name__) | |
api = Api(app,authorizations={ | |
'API-KEY': { | |
'type': 'apiKey', | |
'in': 'header', | |
'name': 'AUTH-TOKEN' | |
} | |
}, | |
security='API-KEY', | |
default="hello_world", | |
title="test api", | |
description="test doc description") | |
db = SQLAlchemy(app) | |
def requires_auth(f): | |
@wraps(f) | |
def decorated(*args, **kwargs): | |
token = request.headers.get('AUTH-TOKEN') | |
if not token: | |
abort(401, 'Authentication token is missing') | |
try: | |
user = auth.validate_token(token) | |
except SignatureExpired as e: | |
abort(401, e.message) | |
except BadSignature as e: | |
abort(401, e.message) | |
return f(*args, **kwargs) | |
return decorated | |
credential_model = api.model('credential', { | |
'username': fields.String, | |
'password': fields.String | |
}) | |
credential_parser = reqparse.RequestParser() | |
credential_parser.add_argument('username', type=str) | |
credential_parser.add_argument('password', type=str) | |
@api.route('/token') | |
class Token(Resource): | |
@api.response(200, 'Successful') | |
@api.doc(description="Generates a authentication token") | |
@api.expect(credential_parser, validate=True) | |
def get(self): | |
# I switched this to Flask's built in requests.args protocol for simplicity's sake, but there may be reasons to use reqparse as you go | |
username = request.args.get('username') | |
password = request.args.get('password') | |
#check if user in db here | |
if not username: | |
api.abort(404, "Username: {} doesn't exist".format(username)) | |
if password != users[username]: | |
api.abort(401, "Wrong password") | |
return {"token": auth.generate_token(username)} | |
@api.response(200, 'Successful') | |
@api.doc(description="Generates a authentication token") | |
@api.expect(credential_parser, validate=True) | |
def post(self): | |
username = request.args.get('username') | |
password = request.args.get('password') | |
try: | |
users[username] = password | |
except: | |
return {"message" : "{} has already been signed".format(username)}, 400 | |
return { | |
"message" : "{} Register Successfully".format(username), | |
"prediction_id" : username | |
}, 200 | |
ns = api.namespace('ns') | |
payload = api.model('Payload', { | |
'score': fields.Integer, | |
'category': fields.String, | |
'guidance': fields.String, | |
'is_ready': fields.Boolean, | |
}) | |
@ns.route('/hello') | |
class AResource(Resource): | |
@ns.expect(payload) | |
def get(self): | |
parser = reqparse.RequestParser() | |
parser.add_argument('score', type=str, required=True) | |
parser.add_argument('category', type=str, required=True) | |
parser.add_argument('guidance', type=str, required=True) | |
parser.add_argument('category', type=str, required=True) | |
parser.add_argument('is_ready', type= bool, required=True) | |
try: # Will raise an error if date can't be parsed. | |
args = parser.parse_args() # type "dict" | |
return jsonify(args) | |
except: | |
return None, 400 | |
if __name__ == '__main__': | |
#fake database | |
users = {"admin":"somepassword", "jimmy":"someotherpassword"} | |
api.add_namespace(ns) | |
app.run(debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment