Last active
August 29, 2015 14:11
-
-
Save jstacoder/d3ee56db1bc6d22caa83 to your computer and use it in GitHub Desktop.
fully scripted basecamp oauth api authentication, via mechanize with a little help from Flask
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import flask | |
app = flask.Flask(__name__) | |
# for this to work you need to register your redirect uri to SERVER_IP/auth/confirm | |
@app.route('/auth/confirm') | |
def get(): | |
return flask.jsonify(dict(code=flask.request.args.get('code',None))) | |
def run_server(): | |
app.run(host='0.0.0.0',port=8080,debug=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from helpers import TokenData,RThread,get_browser | |
from constants import CLIENT_ID,CLIENT_SECRET,REDIRECT_URI | |
from app import run_server | |
import os | |
import requests | |
def get_code(username,password): | |
b = get_browser() | |
b.open('https://launchpad.37signals.com/authorization/new?type=web_server&client_id={}&redirect_uri={}'.format(CLIENT_ID,REDIRECT_URI)) | |
b.select_form(nr=0) | |
b.form['username'] = username | |
b.form['password'] = password | |
res = b.submit() | |
b.select_form(nr=0) | |
code = json.loads(b.submit().read().replace('\n','')).get('code') | |
return get_token(code) | |
def get_token(code): | |
url = 'https://launchpad.37signals.com/authorization/token?type=web_server&client_id={}&redirect_uri={}&client_secret={}&code={}'.format( | |
CLIENT_ID,REDIRECT_URI,CLIENT_SECRET,code) | |
res = requests.post(url) | |
return res | |
def auth(username,password): | |
t1 = RThread(func=run_server) | |
t2 = RThread(get_code,username,password) | |
t1.start() | |
t2.start() | |
rtn = t2.result | |
t1.terminate() | |
return rtn.json() if rtn.ok else rtn.content | |
def get_data(username,password): | |
return TokenData(**auth(username,password)) | |
if __name__ == "__main__": | |
username = os.environ.get('BASECAMP_USERNAME') | |
password = os.environ.get('BASECAMP_PASSWORD') | |
print auth(username,password) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CLIENT_ID = 'basecamp oauth client id' | |
CLIENT_SECRET = 'basecamp oauth client secret' | |
REDIRECT_URI = 'basecamp registerd oauth redirect uri' | |
AUTH_HEADER_FORMAT = 'Authorization: Bearer %s' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import mechanize | |
from time import sleep | |
import ctypes | |
import cookielib | |
def _async_raise(tid, excobj): | |
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(excobj)) | |
if res == 0: | |
raise ValueError("nonexistent thread id") | |
elif res > 1: | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) | |
raise SystemError("PyThreadState_SetAsyncExc failed") | |
class Thread(threading.Thread): | |
def raise_exc(self, excobj): | |
assert self.isAlive(), "thread must be started" | |
for tid, tobj in threading._active.items(): | |
if tobj is self: | |
_async_raise(tid, excobj) | |
return | |
def terminate(self): | |
self.raise_exc(SystemExit) | |
class RThread(Thread): | |
_result = None | |
def __init__(self,func,*args,**kwargs): | |
threading.Thread.__init__(self) | |
self.func = func | |
self.args = args | |
self.kwargs = kwargs | |
def run(self): | |
self._result = self.func(*self.args,**self.kwargs) | |
@property | |
def result(self): | |
if self._result is None: | |
if not self.is_alive(): | |
self.run() | |
while self.is_alive(): | |
sleep(.05) | |
if self.is_alive(): | |
continue | |
else: | |
break | |
return self._result | |
def get_browser(debug=False): | |
browser = mechanize.Browser() | |
browser.set_handle_equiv(True) | |
browser.set_handle_redirect(True) | |
browser.set_handle_referer(True) | |
browser.set_handle_robots(False) | |
browser.set_handle_refresh( | |
mechanize._http.HTTPRefreshProcessor(), | |
max_time=1, | |
) | |
if debug: | |
browser.set_debug_http(True) | |
browser.set_debug_redirects(True) | |
browser.set_debug_responses(True) | |
cj = cookielib.LWPCookieJar() | |
browser.set_cookiejar(cj) | |
browser.addheaders = [('User-agent', 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.1) Gecko/2008071615 Fedora/3.0.1-1.fc9 Firefox/3.0.1')] | |
return browser | |
class TokenData(object): | |
_access_token = None | |
_refresh_token = None | |
_expires_in = 0 | |
def __init__(self,access_token=None,refresh_token=None,expires_in=0): | |
self._refresh_token = refresh_token | |
self._access_token = access_token | |
self._expires_in = expires_in | |
@property | |
def expires(self): | |
if self._expires_in/60/60/24 > 1: | |
rtn = str(self._expires_in/60/60/24) + ' days' | |
elif self._expires_in/60/60 > 1: | |
rtn = str(self._expires_in/60/60) + ' hours' | |
elif self._expires_in/60 > 1: | |
rtn = str(self._expires_in/60) + ' minutes' | |
else: | |
rtn = str(self._expires_in ) + ' seconds' | |
return 'expires in ' + rtn | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment