Created
July 15, 2025 19:04
-
-
Save quintonwall/10065deb3b83fa44cb9cb3fde4dbea66 to your computer and use it in GitHub Desktop.
demo of Airbyte oauth flow via API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import os | |
from typing import Dict, Any, Optional | |
class AirbyteOAuthFlow: | |
""" | |
A class to handle the Airbyte OAuth flow sequence: | |
1. Create access token | |
2. Create/update organization OAuth credentials | |
3. Initiate OAuth | |
""" | |
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.airbyte.com/v1"): | |
self.client_id = client_id | |
self.client_secret = client_secret | |
self.base_url = base_url | |
self.access_token = None | |
self.session = requests.Session() | |
def create_access_token(self) -> Dict[str, Any]: | |
""" | |
Step 1: Create an access token using client credentials | |
""" | |
# Try multiple endpoint variations based on the error response | |
endpoints_to_try = [ | |
f"{self.base_url}/applications/token", | |
"https://api.airbyte.com/api/public/v1/applications/token", # From error response | |
f"{self.base_url.replace('/v1', '/api/public/v1')}/applications/token" | |
] | |
payload = { | |
"client_id": self.client_id, | |
"client_secret": self.client_secret, | |
"grant_type": "client_credentials" | |
} | |
print("Step 1: Creating access token...") | |
for i, url in enumerate(endpoints_to_try, 1): | |
print(f"\nπ Attempt {i}: Trying endpoint {url}") | |
# Try JSON format first | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json" | |
} | |
print(f"π Request headers: {headers}") | |
print(f"π Request payload: {payload}") | |
response = requests.post(url, json=payload, headers=headers) | |
print(f"π Response status: {response.status_code}") | |
print(f"π Response headers: {dict(response.headers)}") | |
if response.status_code == 200: | |
token_data = response.json() | |
self.access_token = token_data.get("access_token") | |
print(f"β Access token created successfully with endpoint {url}") | |
print(f"Token type: {token_data.get('token_type')}") | |
print(f"Expires in: {token_data.get('expires_in')} seconds") | |
# Set authorization header for subsequent requests | |
self.session.headers.update({ | |
"Authorization": f"Bearer {self.access_token}", | |
"accept": "application/json", | |
"content-type": "application/json" | |
}) | |
return token_data | |
elif response.status_code == 500: | |
print(f"β 500 error with {url}") | |
print(f"Response: {response.text}") | |
# Try form-encoded format for this endpoint | |
print(f"π Trying form-encoded format for {url}") | |
form_headers = { | |
"accept": "application/json", | |
"content-type": "application/x-www-form-urlencoded" | |
} | |
alt_response = requests.post(url, data=payload, headers=form_headers) | |
print(f"π Form-encoded response status: {alt_response.status_code}") | |
if alt_response.status_code == 200: | |
token_data = alt_response.json() | |
self.access_token = token_data.get("access_token") | |
print(f"β Access token created successfully with form-encoded format") | |
print(f"Token type: {token_data.get('token_type')}") | |
print(f"Expires in: {token_data.get('expires_in')} seconds") | |
# Set authorization header for subsequent requests | |
self.session.headers.update({ | |
"Authorization": f"Bearer {self.access_token}", | |
"accept": "application/json", | |
"content-type": "application/json" | |
}) | |
return token_data | |
else: | |
print(f"β Form-encoded also failed: {alt_response.status_code}") | |
print(f"Form-encoded response: {alt_response.text}") | |
else: | |
print(f"β Failed with status {response.status_code}") | |
print(f"Response: {response.text}") | |
# If all attempts failed, raise the last response error | |
print(f"\nπ₯ All endpoint attempts failed. This appears to be a server-side issue with the Airbyte API.") | |
print(f"The error suggests an internal Java framework issue in their API server.") | |
print(f"You may need to:") | |
print(f" 1. Contact Airbyte support about this server error") | |
print(f" 2. Check if your API credentials are for the correct environment (production vs sandbox)") | |
print(f" 3. Verify your account has the necessary permissions") | |
print(f" 4. Try again later as this may be a temporary server issue") | |
response.raise_for_status() | |
def create_or_update_oauth_credentials(self, | |
organization_id: str, | |
actor_type: str, | |
source_name: str, | |
configuration: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Step 2: Create or update organization OAuth credentials | |
Args: | |
organization_id: The organization ID | |
actor_type: "source" or "destination" | |
source_name: Name of the source (e.g., "google-ads", "facebook-marketing") | |
configuration: OAuth configuration specific to the source | |
""" | |
if not self.access_token: | |
raise ValueError("Access token not available. Call create_access_token() first.") | |
url = f"{self.base_url}/organizations/{organization_id}/oauthCredentials" | |
payload = { | |
"actorType": actor_type, | |
"name": source_name, | |
"configuration": { | |
"credentials": configuration | |
} | |
} | |
print(f"Step 2: Creating/updating OAuth credentials for {source_name}...") | |
print(f"π OAuth payload: {json.dumps(payload, indent=2)}") | |
response = self.session.put(url, json=payload) | |
if response.status_code == 200: | |
print(f"β OAuth credentials updated successfully for {source_name}") | |
return response.json() if response.text else {"status": "success"} | |
else: | |
print(f"β Failed to update OAuth credentials: {response.status_code}") | |
print(f"Response: {response.text}") | |
if response.status_code == 403: | |
print("β οΈ 403 Forbidden - This could be due to:") | |
print(" - Invalid API key") | |
print(" - Insufficient permissions for the organization") | |
print(" - Organization ID not accessible with current credentials") | |
response.raise_for_status() | |
def initiate_oauth(self, | |
redirect_url: str, | |
workspace_id: str, | |
source_type: str, | |
oauth_input_configuration: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Step 3: Initiate OAuth for a source | |
Args: | |
redirect_url: URL to redirect after OAuth completion | |
workspace_id: The workspace ID to create the secret | |
source_type: Type of source (e.g., "google-ads", "facebook-marketing") | |
oauth_input_configuration: Optional OAuth input configuration | |
""" | |
if not self.access_token: | |
raise ValueError("Access token not available. Call create_access_token() first.") | |
url = f"{self.base_url}/sources/initiateOAuth" | |
payload = { | |
"redirectUrl": redirect_url, | |
"workspaceId": workspace_id, | |
"sourceType": source_type | |
} | |
if oauth_input_configuration: | |
payload["oAuthInputConfiguration"] = oauth_input_configuration | |
print(f"Step 3: Initiating OAuth for {source_type}...") | |
response = self.session.post(url, json=payload) | |
if response.status_code == 200: | |
oauth_data = response.json() | |
print(f"β OAuth initiated successfully for {source_type}") | |
print(f"Redirect URL: {oauth_data.get('redirect_url', 'Not provided')}") | |
return oauth_data | |
else: | |
print(f"β Failed to initiate OAuth: {response.status_code}") | |
print(f"Response: {response.text}") | |
response.raise_for_status() | |
def run_complete_flow(self, | |
organization_id: str, | |
workspace_id: str, | |
source_type: str, | |
redirect_url: str, | |
oauth_configuration: Dict[str, Any], | |
oauth_input_configuration: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Run the complete OAuth flow sequence | |
""" | |
try: | |
print("π Starting Airbyte OAuth Flow...") | |
print("=" * 50) | |
# Step 1: Create access token | |
token_data = self.create_access_token() | |
print("\n" + "=" * 50) | |
# Step 2: Create/update OAuth credentials | |
credentials_data = self.create_or_update_oauth_credentials( | |
organization_id=organization_id, | |
actor_type="source", | |
source_name=source_type, | |
configuration=oauth_configuration | |
) | |
print("\n" + "=" * 50) | |
# Step 3: Initiate OAuth | |
oauth_data = self.initiate_oauth( | |
redirect_url=redirect_url, | |
workspace_id=workspace_id, | |
source_type=source_type, | |
oauth_input_configuration=oauth_input_configuration | |
) | |
print("\n" + "=" * 50) | |
print("β Complete OAuth flow finished successfully!") | |
return { | |
"token_data": token_data, | |
"credentials_data": credentials_data, | |
"oauth_data": oauth_data | |
} | |
except requests.exceptions.RequestException as e: | |
print(f"β Request failed: {e}") | |
raise | |
except Exception as e: | |
print(f"β Unexpected error: {e}") | |
raise | |
def main(): | |
""" | |
Example usage of the AirbyteOAuthFlow class | |
""" | |
# Configuration - Replace with your actual values | |
CLIENT_ID = os.getenv("AIRBYTE_CLIENT_ID", "your_client_id_here") | |
CLIENT_SECRET = os.getenv("AIRBYTE_CLIENT_SECRET", "your_client_secret_here") | |
ORGANIZATION_ID = os.getenv("AIRBYTE_ORGANIZATION_ID", "your_organization_id_here") | |
WORKSPACE_ID = os.getenv("AIRBYTE_WORKSPACE_ID", "your_workspace_id_here") | |
# OAuth flow configuration | |
SOURCE_TYPE = "github" # Change to your desired source type | |
REDIRECT_URL = "https://localhost/oauth/callback" # Your OAuth callback URL | |
# Example OAuth configuration for GitHub | |
# This will vary depending on the source type | |
OAUTH_CONFIGURATION = { | |
"client_id": "your_github_oauth_client_id", | |
"client_secret": "your_github_oauth_client_secret" | |
} | |
# Initialize the OAuth flow | |
oauth_flow = AirbyteOAuthFlow( | |
client_id=CLIENT_ID, | |
client_secret=CLIENT_SECRET | |
) | |
try: | |
# Run the complete flow | |
result = oauth_flow.run_complete_flow( | |
organization_id=ORGANIZATION_ID, | |
workspace_id=WORKSPACE_ID, | |
source_type=SOURCE_TYPE, | |
redirect_url=REDIRECT_URL, | |
oauth_configuration=OAUTH_CONFIGURATION | |
) | |
print("\nπ OAuth flow completed successfully!") | |
print("You can now redirect users to the OAuth URL to complete authentication.") | |
except Exception as e: | |
print(f"\nπ₯ OAuth flow failed: {e}") | |
return 1 | |
return 0 | |
if __name__ == "__main__": | |
exit(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requests>=2.31.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment