Skip to content

Instantly share code, notes, and snippets.

@SalvaJ
Forked from s-shin/oauth2.py
Created March 19, 2014 09:56
Show Gist options
  • Save SalvaJ/9638710 to your computer and use it in GitHub Desktop.
Save SalvaJ/9638710 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""This module is a sample of the OAuth2 authentication by Python3"""
__version__ = "0.1.0"
__author__ = "shin (shin.hateblo.jp)"
__copyright__ = "(C) 2012 shin"
__email__ = "[email protected]"
__license__ = "Apache License 2.0"
__status__ = "Prototype"
import configparser
import hashlib
import json
import urllib.parse
import urllib.request
class OAuth2:
"""This class helps the OAuth2 authentication.
The configure file whose format is ini(cfg) is needed.
Some fields are required in the file.
- authz_endpoint
- token_endpoint
- client_id
- client_secret
- redirect_uri
- scope
Below is a sample.
======================
[DEFAULT]
authz_endpoint = https://accounts.google.com/o/oauth2/auth
token_endpoint = https://accounts.google.com/o/oauth2/token
client_id = [Your Client ID]
client_secret = [Your Client Secret]
redirect_uri = urn:ietf:wg:oauth:2.0:oob
scope = https://www.google.com/reader/atom https://www.google.com/reader/api
======================
"""
AUTHZ_ENDPOINT = "authz_endpoint"
TOKEN_ENDPOINT = "token_endpoint"
CLIENT_ID = "client_id"
CLIENT_SECRET = "client_secret"
REDIRECT_URI = "redirect_uri"
SCOPE = "scope"
ACCESS_TOKEN = "access_token"
REFRESH_TOKEN = "refresh_token"
def __init__(self, configFile="oauth2.ini", additionalSection=None):
"""This is a constructor.
Arguments:
configFile -- The config file path
additionalSection -- If you use an extra section, you can set the name.
"""
self.configFile = configFile
self.additionalSection = additionalSection
self.__initConfiguration() # init self.conf, self.orgConf
self.__initCacheSection() # init self.cacheSection
self.accessToken = None
self.refreshToken = None
self.__loadCacheTokens()
def __initConfiguration(self):
"""Load the config file and set to `self.conf`"""
conf = configparser.ConfigParser()
with open(self.configFile, "r") as f:
conf.readfp(f)
self.orgConf = conf
# check additionalSection
adSection = self.additionalSection
if adSection in conf:
adSection = conf[adSection]
self.conf = {}
for i in [self.CLIENT_ID, self.CLIENT_SECRET, self.AUTHZ_ENDPOINT,
self.TOKEN_ENDPOINT, self.REDIRECT_URI, self.SCOPE]:
if adSection != None and i in adSection:
self.conf[i] = adSection[i]
else:
self.conf[i] = conf["DEFAULT"][i]
def __initCacheSection(self):
"""Get the hash value for the cache section from authz endpoint and client id"""
m = hashlib.md5()
for i in [self.AUTHZ_ENDPOINT, self.CLIENT_ID]:
m.update(bytes(self.conf[i], "utf-8"))
self.cacheSection = str(m.hexdigest())
def __loadCacheTokens(self):
"""Load the tokens from the cache section"""
with open(self.configFile, "r") as f:
self.orgConf.readfp(f)
if not self.cacheSection in self.orgConf:
return
t = self.orgConf[self.cacheSection]
if self.ACCESS_TOKEN in t:
self.accessToken = t[self.ACCESS_TOKEN]
if self.REFRESH_TOKEN in t:
self.refreshToken = t[self.REFRESH_TOKEN]
def __saveCacheTokens(self):
"""Save the tokens to the cache section"""
self.orgConf[self.cacheSection] = {
self.ACCESS_TOKEN: self.accessToken,
self.REFRESH_TOKEN: self.refreshToken
}
with open(self.configFile, "w") as f:
self.orgConf.write(f)
def getUrlForAuthCode(self):
"""Get the URL for getting an authorized code.
This URL is accessed with a web browser.
Return: URL
"""
params = {"response_type": "code"}
for i in [self.CLIENT_ID, self.REDIRECT_URI, self.SCOPE]:
params[i] = self.conf[i]
url = "{0}?{1}".format(
self.conf[self.AUTHZ_ENDPOINT], urllib.parse.urlencode(params))
return url
def getAccessToken(self, code=None):
"""Get access token.
Arguments:
code -- The authorized code that is got by accessing the URL returned
from `self.getUrlForAuthCode`. If this is `None`, the current access
token is returned.
Return: access token
"""
if code != None:
# get access token and request token from the `code`
params = {"code": code, "grant_type": "authorization_code"}
for i in [self.CLIENT_ID, self.CLIENT_SECRET, self.REDIRECT_URI]:
params[i] = self.conf[i]
data = urllib.parse.urlencode(params).encode("utf-8")
request = urllib.request.Request(self.conf[self.TOKEN_ENDPOINT])
request.add_header("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
f = urllib.request.urlopen(request, data)
root = json.loads(f.read().decode("utf-8"))
self.accessToken = root[self.ACCESS_TOKEN]
self.refreshToken = root[self.REFRESH_TOKEN]
self.__saveCacheTokens()
return self.accessToken
def refreshAccessToken(self):
"""Refresh current access token with refresh token
Return: access token
"""
params = {"grant_type": "refresh_token",
"refresh_token": self.refreshToken}
for i in [self.CLIENT_ID, self.CLIENT_SECRET]:
params[i] = self.conf[i]
data = urllib.parse.urlencode(params).encode("utf-8")
request = urllib.request.Request(self.conf[self.TOKEN_ENDPOINT])
request.add_header("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
f = urllib.request.urlopen(request, data)
root = json.loads(f.read().decode("utf-8"))
self.accessToken = root[self.ACCESS_TOKEN]
self.__saveCacheTokens()
return self.accessToken
#---------------------------------
def __main():
"""You can run this test method after preparing the following ini file.
======================
[DEFAULT]
authz_endpoint = https://accounts.google.com/o/oauth2/auth
token_endpoint = https://accounts.google.com/o/oauth2/token
client_id = [Your Client ID]
client_secret = [Your Client Secret]
redirect_uri = urn:ietf:wg:oauth:2.0:oob
scope = https://www.google.com/reader/atom
======================
The client id and client secret can be got from "Google APIs Console".
Google APIs Console: https://code.google.com/apis/console/
"""
def doTest(token):
print("access token: " + token)
testurl = "https://www.google.com/reader/atom/user/-/state/com.google/reading-list"
request = urllib.request.Request(testurl)
request.add_header("Authorization", "OAuth " + token)
f = urllib.request.urlopen(request)
print(f.read())
oauth2 = OAuth2()
token = oauth2.getAccessToken()
if token == None:
print("Input the following URL into your browser and access it!")
print(oauth2.getUrlForAuthCode())
code = input("Paste the code ---> ")
token = oauth2.getAccessToken(code)
try:
doTest(token)
except urllib.error.HTTPError as e:
if e.code == 401:
token = oauth2.refreshAccessToken()
doTest(token)
if __name__ == "__main__":
__main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment