Created
March 11, 2020 15:18
-
-
Save cnicodeme/c3311ac6ec3b80f74e9e17a250b4d6db to your computer and use it in GitHub Desktop.
Custom implementation of the Twitter OAuth API.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf-8 -*- | |
from urllib.parse import parse_qs, quote | |
import oauth2, time, requests, json | |
""" | |
Usage: | |
# To setup the access: | |
# GET /twitter/authorize | |
``` | |
twitter = Twitter('CONSUMER_KEY', 'CONSUMER_SECRET') | |
tokens = twitter.get_temporary_token() | |
session['twitter_oauth'] = tokens | |
redirect(twitter.get_authorize_url(tokens['token']) | |
``` | |
# Response from Twitter | |
# GET /twitter/validate?oauth_token={twitter_oauth_token}&oauth_verifier={twitter_oauth_verifier} | |
``` | |
twitter = Twitter('CONSUMER_KEY', 'CONSUMER_SECRET') | |
# We take Flask's "request.args" to get args from the URL | |
assert request.args.get('oauth_token') == session['twitter_oauth']['token'] | |
oauth = twitter.verify(session['twitter_oauth'], request.args.get('oauth_verifier')) | |
# oauth will contain the real oauth_token and oauth_token_secret along with the user_id and screen_name | |
print(oauth['screen_name']) | |
# Save `oauth` details in the database | |
``` | |
# GET /twitter/details | |
``` | |
twitter = Twitter.init(oauth['oauth_token'], oauth['oauth_token_secret'], 'CONSUMER_KEY', 'CONSUMER_SECRET') | |
return jsonify(twitter.get_user(oauth['screen_name'])) | |
``` | |
""" | |
TWITTER_REQUEST_TOKEN_URL = 'https://api.twitter.com/oauth/request_token' | |
TWITTER_AUTHORIZE_URL = 'https://api.twitter.com/oauth/authorize' | |
TWITTER_ACCESS_TOKEN_URL = 'https://api.twitter.com/oauth/access_token' | |
TWITTER_BASE_ENDPOINT = "https://api.twitter.com/1.1/" | |
class Twitter: | |
def __init__(self, consumer_key, consumer_secret): | |
self.consumer_key = consumer_key | |
self.consumer_secret = consumer_secret | |
if self.consumer_key is None: | |
raise AttributeError('Missing consumer key') | |
if self.consumer_secret is None: | |
raise AttributeError('Missing consumer key') | |
self.consumer = oauth2.Consumer(self.consumer_key, self.consumer_secret) | |
def get_temporary_token(self, redirect=None): | |
client = oauth2.Client(self.consumer) | |
url = TWITTER_REQUEST_TOKEN_URL | |
if redirect: | |
url += '?oauth_callback={0}'.format(quote(redirect)) | |
response, content = client.request(url, 'GET') | |
assert response['status'] == '200' | |
response = parse_qs(content) | |
assert response.get(b'oauth_callback_confirmed', None)[0] == b'true' | |
return { | |
'token': response.get(b'oauth_token')[0].decode('utf-8'), | |
'token_secret': response.get(b'oauth_token_secret')[0].decode('utf-8'), | |
} | |
def get_authorize_url(self, oauth_token): | |
return '{0}?oauth_token={1}'.format(TWITTER_AUTHORIZE_URL, oauth_token) | |
def verify(self, oauth_tokens, oauth_verifier): | |
token = oauth2.Token(oauth_tokens['token'], oauth_tokens['token_secret']) | |
token.set_verifier(oauth_verifier) | |
client = oauth2.Client(self.consumer, token) | |
response, content = client.request(TWITTER_ACCESS_TOKEN_URL, 'POST') | |
assert response['status'] == '200' | |
response = parse_qs(content) | |
result = { | |
'oauth_token': response.get(b'oauth_token')[0].decode('utf-8'), | |
'oauth_token_secret': response.get(b'oauth_token_secret')[0].decode('utf-8'), | |
'user_id': response.get(b'user_id')[0].decode('utf-8'), | |
'screen_name': response.get(b'screen_name')[0].decode('utf-8') # Without @ prefix | |
} | |
self.set_access_token(result['oauth_token'], result['oauth_token_secret']) | |
return result | |
def set_access_token(self, oauth_token, oauth_token_secret): | |
self.token = oauth2.Token(oauth_token, oauth_token_secret) | |
def _request(self, method, path, params=None): | |
if self.token is None: | |
raise ValueError('Missing token instance') | |
if method not in ('GET', 'POST'): | |
raise ValueError('Only GET/POST method accepted.') | |
url = '{0}{1}.json'.format(TWITTER_BASE_ENDPOINT, path) | |
if method == 'GET': | |
params = params or {} | |
params.update({ | |
'oauth_version': "1.0", | |
'oauth_nonce': oauth2.generate_nonce(), | |
'oauth_timestamp': str(int(time.time())), | |
}) | |
request = oauth2.Request(method, url, params) | |
request.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, self.token) | |
response = requests.get(request.to_url()) | |
else: | |
request_params = { | |
'oauth_version': "1.0", | |
'oauth_nonce': oauth2.generate_nonce(), | |
'oauth_timestamp': str(int(time.time())), | |
} | |
request = oauth2.Request(method, url, request_params, json.dumps(params).encode('utf-8')) | |
request.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, self.token) | |
response = requests.post(url, json=params, headers={ | |
'Content-Type': 'application/json', | |
'Accept': 'application/json', | |
'authorization': request.to_header()['Authorization'] | |
}) | |
response.raise_for_status() | |
return response.json() | |
def get(self, path, params=None): | |
return self._request('GET', path, params) | |
def post(self, path, params=None): | |
return self._request('POST', path, params) | |
def get_user(self, screen_name, include_entities=False): | |
return self.get('users/show', {'screen_name': screen_name, 'include_entities': include_entities}) | |
def send_message(self, recipient, message): | |
""" | |
Sends a direct message to a specific user ID (not screen name) | |
@rate-limit: 1000 per user; 15000 per app | Per 24 hour window | |
""" | |
return self.post('direct_messages/events/new', { | |
"event": { | |
"type": "message_create", | |
"message_create": { | |
"target": {"recipient_id": recipient}, | |
"message_data": {"text": message} | |
} | |
} | |
}) | |
def list_messages(self, cursor=None, count=50): | |
""" | |
count: 50 is max. | |
""" | |
params = { | |
'count': str(count) | |
} | |
if cursor: | |
params['cursor'] = str(cursor) | |
return self.get('direct_messages/events/list', params) | |
@classmethod | |
def init(cls, consumer_key, consumer_secret, token, secret): | |
tw = Twitter(consumer_key, consumer_secret) | |
tw.set_access_token(token, secret) | |
return tw |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment