Created
May 11, 2019 14:02
-
-
Save KentaYamada/140137cd389c9eb2244beb1f72e79116 to your computer and use it in GitHub Desktop.
Python3 flask JWT authorization example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import jsonify, Flask | |
from flask_jwt import jwt_required, current_identity, JWT | |
class User: | |
def __init__(self, id, username, password): | |
self.id = id | |
self.username = username | |
self.password = password | |
users = [User(i, 'user{}'.format(i), 'ab{}cd'.format(i)) for i in range(1, 3)] | |
def authoricate(username, password): | |
""" callback auth api """ | |
target = next((user for user in users if user.username == username), None) | |
is_auth = True if target is not None and target.password == password else False | |
return target if is_auth else None | |
def identity(payload): | |
""" current_identity callback """ | |
user_id = payload['identity'] | |
target = next((user for user in users if user.id == user_id), None) | |
return target | |
app = Flask(__name__) | |
app.debug = True | |
app.config['SECRET_KEY'] = 'develop' | |
app.config['JWT_AUTH_URL_RULE'] = '/api/auth' | |
jwt = JWT(app, authoricate, identity) | |
@app.route('/api/protected', methods=['GET']) | |
@jwt_required() | |
def protected(): | |
return jsonify({'result': True}) | |
if __name__ == '__main__': | |
app.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import unittest | |
from server import app, authoricate, identity | |
class TestJwtServer(unittest.TestCase): | |
CONTENT_TYPE = 'application/json' | |
AUTH_ENDPOINT = '/api/auth' | |
PROTECTED_ENDPOINT = '/api/protected' | |
@classmethod | |
def setUpClass(cls): | |
cls.client = app.test_client() | |
@classmethod | |
def tearDownClass(cls): | |
cls.client = None | |
def test_authoricate(self): | |
result = authoricate('user1', 'ab1cd') | |
self.assertIsNotNone(result) | |
self.assertEqual(result.username, 'user1') | |
self.assertEqual(result.password, 'ab1cd') | |
def test_identity(self): | |
result = identity({'identity': 1}) | |
self.assertIsNotNone(result) | |
self.assertEqual(result.username, 'user1') | |
self.assertEqual(result.password, 'ab1cd') | |
def test_auth_ok(self): | |
data = json.dumps({ | |
'username': 'user1', | |
'password': 'ab1cd' | |
}) | |
response = self.client.post( | |
self.AUTH_ENDPOINT, | |
content_type=self.CONTENT_TYPE, | |
data=data | |
) | |
self.assertEqual(200, response.status_code) | |
def test_protected_when_after_login(self): | |
data = json.dumps({ | |
'username': 'user1', | |
'password': 'ab1cd' | |
}) | |
response = self.client.post( | |
self.AUTH_ENDPOINT, | |
content_type=self.CONTENT_TYPE, | |
data=data | |
) | |
self.assertEqual(200, response.status_code) | |
response_data = json.loads(response.data) | |
access_token = response_data.get('access_token', '') | |
header_data = { | |
'Authorization': 'JWT {}'.format(access_token) | |
} | |
response2 = self.client.get( | |
self.PROTECTED_ENDPOINT, | |
headers=header_data | |
) | |
response_data2 = json.loads(response2.data) | |
self.assertEqual(200, response2.status_code) | |
self.assertTrue(response_data2.get('result', False)) | |
def test_protected_when_no_auth_request(self): | |
response = self.client.get(self.PROTECTED_ENDPOINT) | |
self.assertEqual(401, response.status_code) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment