Last active
July 15, 2018 05:41
-
-
Save bdeshi/213433700a9a2485d259ab902a222d18 to your computer and use it in GitHub Desktop.
twitter api test helper[WIP]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""basic twitter api""" | |
from time import time | |
from math import floor, ceil | |
from os import urandom | |
from base64 import standard_b64encode as base64encode | |
import hashlib | |
import hmac | |
from urllib.parse import quote | |
import httplib2 | |
def urlquote(str): | |
return quote(str, '') | |
class Twi(object): | |
api = 'https://api.twitter.com' | |
def __init__(self, ckey, cskey, akey=None, askey=None, | |
oauth_ver='1.0', oauth_method='HMAC-SHA1', api_ver='1.1'): | |
"""initialize, duh""" | |
self.ckey = ckey | |
self.cskey = cskey | |
self.akey = akey | |
self.askey = askey | |
self.oauth_method = oauth_method.upper() | |
self.oauth_ver = str(oauth_ver) | |
self.api_ver = str(api_ver) | |
def auth(self, endpoint_url, method, params): | |
"""generate authorization header""" | |
# https://developer.twitter.com/en/docs/basics/authentication/guides/creating-a-signature.html | |
def hmac_hash(key, message): | |
key = bytes(key, 'UTF-8') | |
message = bytes(message, 'UTF-8') | |
hash_scheme = hashlib.sha1 | |
hmac_hash = hmac.new(key, message, hash_scheme) | |
signature = bytes(hmac_hash.digest()) | |
return signature | |
def gen_sign(params): | |
# parameters must be sorted as urlencoded keys | |
param_keys = [urlquote(k) for k in params] | |
param_keys = sorted(param_keys) | |
param_str = '' | |
for key in param_keys: | |
if params[key] is not None: | |
param_str += '%s=%s&' % (key, urlquote(params[key])) | |
print('param_str:\n' + param_str) | |
param_str = urlquote(param_str.rstrip('&')) | |
print('param_str_enc:\n' + param_str) | |
base_str = '&'.join([ | |
urlquote(method.upper()), | |
urlquote(endpoint_url), | |
param_str]) | |
print('base_str:\n' + base_str) | |
signing_key = urlquote(self.cskey) + '&' + urlquote(self.askey) | |
signature = hmac_hash(signing_key, base_str) | |
b64_sign = base64encode(signature).decode() | |
print('b64_sign:\n' + b64_sign) | |
return b64_sign | |
# these params passed to signer function | |
# as well as included in auth header | |
auth_params = { | |
'oauth_consumer_key': self.ckey, | |
'oauth_nonce': 'kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg', # urandom(32).hex(), | |
'oauth_signature': None, | |
'oauth_signature_method': self.oauth_method, | |
# 'oauth_timestamp': str(floor(time())), | |
'oauth_timestamp': '1318622958', | |
'oauth_token': self.akey, | |
'oauth_version': self.oauth_ver | |
} | |
oauth_signature = gen_sign({**params, **auth_params}) | |
auth_params['oauth_signature'] = oauth_signature | |
auth_bundle = "".join( | |
['OAuth '] + | |
['%s="%s",' % (key, urlquote(auth_params[key])) for key in auth_params] | |
) | |
auth_bundle = auth_bundle.rstrip(',') | |
print('auth_bundle:\n' + auth_bundle) | |
auth_header = { "Authorization": auth_bundle } | |
return auth_header | |
def get_oauth(self): | |
"""retrieve oauth token+secret for given cosumer key+secret""" | |
raise NotImplementedError() | |
def request(self, endpoint, method, **params): | |
"""send authorized request to endpoint and return response""" | |
http = httplib2.Http() | |
if not endpoint.endswith('.json'): | |
endpoint = endpoint+'.json' | |
endpoint_url = '/'.join([self.api, self.api_ver, endpoint]) | |
auth = self.auth(endpoint_url, method, params) | |
(status, response) = http.request(endpoint_url, method, body=params, headers=auth) | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment