Created
February 7, 2013 02:44
-
-
Save KazuyaHayashi/4727985 to your computer and use it in GitHub Desktop.
utility for JWT.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import urllib | |
import atom.http_core | |
from oauth2client import client | |
class Error(Exception): | |
def __str__(self): | |
return "Error: %s" % self.error_message | |
class OAuth2JWTError(Error): | |
"""Raised when an OAuth2 error occurs.""" | |
def __init__(self, error_message): | |
self.error_message = error_message | |
def get_service_private_key(): | |
f = open('service_privatekey.p12','rb') | |
key = f.read() | |
f.close() | |
return key | |
def get_clientSecrets(): | |
f = open('service_client_secrets.json') | |
secrets_json = f.read() | |
f.close() | |
return json.loads(secrets_json) | |
def get_JWT(assertion): | |
body = urllib.urlencode({ | |
'grant_type':'urn:ietf:params:oauth:grant-type:jwt-bearer', | |
'assertion':assertion | |
} | |
)_ | |
headers = {} | |
http_client = atom.http_core.HttpClient() | |
http_request = atom.http_core.HttpRequest(uri="https://accounts.google.com/o/oauth2/token", | |
method="POST", headers=headers) | |
http_request.add_body_part(data=body, mime_type="application/x-www-form-urlencoded") | |
response = http_client.Request(http_request) | |
body = response.read() | |
if response.status == 200: | |
return body | |
else: | |
error_msg = 'Invalid response %s.' % response.status | |
try: | |
d = json.loads(body) | |
if 'error' in d: | |
error_msg = d['error'] | |
except: | |
pass | |
raise OAuth2JWTError(error_msg) | |
def getServiceAccountsAccessToken(user): | |
client_secrets = get_clientSecrets() | |
private_key = get_service_private_key() | |
scope = [ | |
'https://mail.google.com/', | |
'https://apps-apis.google.com/a/feeds/user/', | |
'https://apps-apis.google.com/a/feeds/groups/', | |
] | |
#jwt_client = client.SignedJwtAssertionCredentials( | |
# service_account_name=client_secrets['web']['client_email'], | |
# private_key=private_key, | |
# scope=scope, | |
# prn=user) | |
jwt_client = client.SignedJwtAssertionCredentials( | |
service_account_name=client_secrets['web']['client_email'], | |
private_key=private_key, | |
scope=scope) | |
jwt = json.loads(get_JWT(jwt_client._generate_assertion())) | |
access_token = jwt["access_token"] | |
return access_token | |
def GenerateOAuth2String(username, access_token, base64_encode=True): | |
"""Generates an IMAP OAuth2 authentication string. | |
See https://developers.google.com/google-apps/gmail/oauth2_overview | |
Args: | |
username: the username (email address) of the account to authenticate | |
access_token: An OAuth2 access token. | |
base64_encode: Whether to base64-encode the output. | |
Returns: | |
The SASL argument for the OAuth2 mechanism. | |
""" | |
auth_string = 'user=%s\1auth=Bearer %s\1\1' % (username, access_token) | |
if base64_encode: | |
auth_string = base64.b64encode(auth_string) | |
return auth_string |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment