Last active
November 12, 2024 11:34
-
-
Save intrd/ba520b4d60019c7f3d2b84417226e83f to your computer and use it in GitHub Desktop.
X/Twitter - Simple Python3 OAuth 2.0 Authorization (PKCE), Refresh Token and Tweet Action without Tweepy or any 3rd-party libs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## X/Twitter - Simple Python3 OAuth 2.0 Authorization (PKCE), Refresh Token and Tweet Action without Tweepy or any 3rd-party libs. | |
# Author: [email protected] | |
# Action: Create Refresh Token | |
import os | |
import secrets | |
import string | |
import requests | |
from urllib.parse import urlencode | |
from datetime import datetime, timedelta | |
import base64 | |
import hashlib | |
# Set your Twitter API credentials | |
# API Key and Secret found @ https://developer.twitter.com/en/portal/projects/ | |
client_id = "xxxxOG1pVHxxxxxxxxxxxxxxxxxxxxxxxx" | |
client_secret = "xxxxxxxxxxxxxxxxAmFlxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" | |
# Set the desired scopes | |
scopes = ["tweet.read", "users.read", "tweet.write", "offline.access"] | |
# Generate the PKCE code challenge and code verifier | |
code_verifier = "".join(secrets.choice(string.ascii_uppercase + string.digits + "-._~") for _ in range(43)) | |
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode().replace("=", "") | |
# Build the authorization URL | |
auth_url = "https://twitter.com/i/oauth2/authorize" | |
params = { | |
"response_type": "code", | |
"client_id": client_id, | |
"redirect_uri": "http://dann.com.br/callback", | |
"scope": " ".join(scopes), | |
"code_challenge": code_challenge, | |
"code_challenge_method": "S256", | |
"state": "some_random_state_value" | |
} | |
auth_url += "?" + urlencode(params) | |
print("Please visit the following URL and authorize the application:") | |
print(auth_url) | |
print() | |
# Prompt the user to enter the authorization code | |
authorization_code = input("Enter the authorization code: ") | |
# Exchange the authorization code for an access token | |
token_url = "https://api.twitter.com/2/oauth2/token" | |
data = { | |
"grant_type": "authorization_code", | |
"code": authorization_code, | |
"redirect_uri": "http://dann.com.br/callback", | |
"client_id": client_id, | |
"code_verifier": code_verifier | |
} | |
response = requests.post(token_url, data=data, auth=(client_id, client_secret)) | |
if response.status_code == 200: | |
access_token = response.json()["access_token"] | |
refresh_token = response.json()["refresh_token"] | |
expires_in = response.json()["expires_in"] | |
print("Access token:", access_token) | |
print("Refresh token:", refresh_token) | |
print("Token expires in:", expires_in, "seconds") | |
else: | |
print("Error:", response.status_code, response.text) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## X/Twitter - Simple Python3 OAuth 2.0 Authorization (PKCE), Refresh Token and Tweet Action without Tweepy or any 3rd-party libs. | |
# Author: [email protected] | |
# Action: Refresh token and Tweet | |
import requests, os, time | |
import os | |
import time | |
# Set your Twitter API credentials | |
# API Key and Secret found @ https://developer.twitter.com/en/portal/projects/ | |
client_id = "xxxxOG1pVHxxxxxxxxxxxxxxxxxxxxxxxx" | |
client_secret = "xxxxxxxxxxxxxxxxAmFlxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" | |
refresh_token_file = "twitter_refresh_token.txt" | |
token_expiration_file = "twitter_token_expiration.txt" | |
access_token_file = "twitter_access_token.txt" | |
# Check if the access token file exists | |
if os.path.exists(access_token_file): | |
with open(refresh_token_file, "r") as file: | |
refresh_token = file.read().strip() | |
# Load the access token from the file | |
with open(access_token_file, "r") as file: | |
access_token = file.read().strip() | |
# Check if the token expiration time file exists | |
if os.path.exists(token_expiration_file): | |
# Load the token expiration time from the file | |
with open(token_expiration_file, "r") as file: | |
token_expiration_time = float(file.read().strip()) | |
# Check if the token is expired | |
if time.time() > token_expiration_time: | |
print("Token is expired, refreshing...") | |
# Refresh the access token | |
token_url = "https://api.twitter.com/2/oauth2/token" | |
data = { | |
"grant_type": "refresh_token", | |
"refresh_token": refresh_token, | |
"client_id": client_id | |
} | |
response = requests.post(token_url, data=data, auth=(client_id, client_secret)) | |
if response.status_code == 200: | |
access_token = response.json()["access_token"] | |
new_refresh_token = response.json()["refresh_token"] | |
expires_in = response.json()["expires_in"] | |
token_expiration_time = time.time() + expires_in | |
print("New access token:", access_token) | |
print("New refresh token:", new_refresh_token) | |
print("Token expires in:", expires_in, "seconds") | |
# Save the new access token and expiration time to the respective files | |
with open(access_token_file, "w") as file: | |
file.write(access_token) | |
with open(refresh_token_file, "w") as file: | |
file.write(new_refresh_token) | |
with open(token_expiration_file, "w") as file: | |
file.write(str(token_expiration_time)) | |
else: | |
print("Error:", response.status_code, response.text) | |
else: | |
print("Token is still valid, no need to refresh.") | |
print("* valid access token") | |
else: | |
print("Token expiration time file not found, refreshing token...") | |
else: | |
print("Access token file not found, please generate a new access token.") | |
# Tweet a test message using the access token | |
if os.path.exists(access_token_file): | |
tweet_url = "https://api.twitter.com/2/tweets" | |
tweet_text = "This is a test." | |
headers = { | |
"Authorization": f"Bearer {access_token}", | |
"Content-Type": "application/json" | |
} | |
tweet_data = { | |
"text": tweet_text | |
} | |
tweet_response = requests.post(tweet_url, headers=headers, json=tweet_data) | |
if tweet_response.status_code == 201: | |
print("Tweet posted successfully!") | |
else: | |
print("Error posting tweet:", tweet_response.status_code, tweet_response.text) | |
else: | |
print("No access token found, unable to tweet.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment