Created
November 14, 2016 17:26
-
-
Save husio/26db6beaff5df317e258a305e8edb48f to your computer and use it in GitHub Desktop.
Service authentication in python.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
import base64 | |
import json | |
from oauth2client.service_account import ServiceAccountCredentials | |
import httplib2 | |
class Authenticator: | |
def __init__(self, auth_service_url): | |
self.auth_service_url = auth_service_url | |
self._token = None | |
self._valid_till = None | |
def headers(self): | |
"""Return dictionary that contains header fields required for the | |
authentication process | |
""" | |
return {"Authentication": "Bearer " + self.jwt()} | |
def jwt(self): | |
if self._token is None \ | |
or self._valid_till > datetime.now() - timedelta(minutes=10): | |
self._token, self._valid_till = authenticate(self.auth_service_url) | |
return self._token | |
def authenticate(auth_service_url): | |
token = acquire_access_token(auth_service_url) | |
try: | |
raw_header, raw_claims, _ = token.split('.') | |
except Exception: | |
raise CorruptedToken("invalid format") | |
try: | |
header = load_b64_json(raw_header) | |
except Exception: | |
raise CorruptedToken("cannot decode header") | |
if header.get("typ") != "JWT": | |
raise InvalidToken("not JWT token") | |
if header.get("alg") != "RS256": | |
raise InvalidToken("signature algorithm not supported") | |
try: | |
claims = load_b64_json(raw_claims) | |
except Exception: | |
raise CorruptedToken("cannot decode claims") | |
if 'exp' in claims: | |
exp = datetime.fromtimestamp(claims['exp']) | |
else: | |
exp = datetime.now() + timedelta(days=5) | |
return (token, exp) | |
def acquire_access_token(auth_service_url): | |
acc = ServiceAccountCredentials.get_application_default() | |
acc = acc.create_scoped([ | |
"https://www.googleapis.com/auth/cloud-platform.read-only", | |
"https://www.googleapis.com/auth/userinfo.email", | |
]) | |
http = acc.authorize(httplib2.Http()) | |
_, body = http.request(auth_service_url + "/auth", "POST") | |
return json.loads(body.decode('utf8'))["accessToken"] | |
def load_b64_json(raw): | |
data = base64.b64decode(fix_base64_padding(raw)) | |
return json.loads(data.decode('utf8')) | |
def fix_base64_padding(data): | |
missing_padding = len(data) % 4 | |
if missing_padding != 0: | |
data += '=' * (4 - missing_padding) | |
return data | |
class AuthenticatorException(Exception): | |
pass | |
class CorruptedToken(AuthenticatorException): | |
pass | |
class InvalidToken(AuthenticatorException): | |
pass | |
if __name__ == "__main__": | |
# demo how it works | |
auth = Authenticator("https://auth-dot-opinary-dev.appspot.com") | |
print(auth.headers()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment