Skip to content

Instantly share code, notes, and snippets.

@quintonwall
Created July 15, 2025 19:04
Show Gist options
  • Save quintonwall/10065deb3b83fa44cb9cb3fde4dbea66 to your computer and use it in GitHub Desktop.
Save quintonwall/10065deb3b83fa44cb9cb3fde4dbea66 to your computer and use it in GitHub Desktop.
demo of Airbyte oauth flow via API
import requests
import json
import os
from typing import Dict, Any, Optional
class AirbyteOAuthFlow:
"""
A class to handle the Airbyte OAuth flow sequence:
1. Create access token
2. Create/update organization OAuth credentials
3. Initiate OAuth
"""
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.airbyte.com/v1"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url
self.access_token = None
self.session = requests.Session()
def create_access_token(self) -> Dict[str, Any]:
"""
Step 1: Create an access token using client credentials
"""
# Try multiple endpoint variations based on the error response
endpoints_to_try = [
f"{self.base_url}/applications/token",
"https://api.airbyte.com/api/public/v1/applications/token", # From error response
f"{self.base_url.replace('/v1', '/api/public/v1')}/applications/token"
]
payload = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials"
}
print("Step 1: Creating access token...")
for i, url in enumerate(endpoints_to_try, 1):
print(f"\nπŸ”„ Attempt {i}: Trying endpoint {url}")
# Try JSON format first
headers = {
"accept": "application/json",
"content-type": "application/json"
}
print(f"πŸ” Request headers: {headers}")
print(f"πŸ” Request payload: {payload}")
response = requests.post(url, json=payload, headers=headers)
print(f"πŸ” Response status: {response.status_code}")
print(f"πŸ” Response headers: {dict(response.headers)}")
if response.status_code == 200:
token_data = response.json()
self.access_token = token_data.get("access_token")
print(f"βœ… Access token created successfully with endpoint {url}")
print(f"Token type: {token_data.get('token_type')}")
print(f"Expires in: {token_data.get('expires_in')} seconds")
# Set authorization header for subsequent requests
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}",
"accept": "application/json",
"content-type": "application/json"
})
return token_data
elif response.status_code == 500:
print(f"❌ 500 error with {url}")
print(f"Response: {response.text}")
# Try form-encoded format for this endpoint
print(f"πŸ”„ Trying form-encoded format for {url}")
form_headers = {
"accept": "application/json",
"content-type": "application/x-www-form-urlencoded"
}
alt_response = requests.post(url, data=payload, headers=form_headers)
print(f"πŸ” Form-encoded response status: {alt_response.status_code}")
if alt_response.status_code == 200:
token_data = alt_response.json()
self.access_token = token_data.get("access_token")
print(f"βœ… Access token created successfully with form-encoded format")
print(f"Token type: {token_data.get('token_type')}")
print(f"Expires in: {token_data.get('expires_in')} seconds")
# Set authorization header for subsequent requests
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}",
"accept": "application/json",
"content-type": "application/json"
})
return token_data
else:
print(f"❌ Form-encoded also failed: {alt_response.status_code}")
print(f"Form-encoded response: {alt_response.text}")
else:
print(f"❌ Failed with status {response.status_code}")
print(f"Response: {response.text}")
# If all attempts failed, raise the last response error
print(f"\nπŸ’₯ All endpoint attempts failed. This appears to be a server-side issue with the Airbyte API.")
print(f"The error suggests an internal Java framework issue in their API server.")
print(f"You may need to:")
print(f" 1. Contact Airbyte support about this server error")
print(f" 2. Check if your API credentials are for the correct environment (production vs sandbox)")
print(f" 3. Verify your account has the necessary permissions")
print(f" 4. Try again later as this may be a temporary server issue")
response.raise_for_status()
def create_or_update_oauth_credentials(self,
organization_id: str,
actor_type: str,
source_name: str,
configuration: Dict[str, Any]) -> Dict[str, Any]:
"""
Step 2: Create or update organization OAuth credentials
Args:
organization_id: The organization ID
actor_type: "source" or "destination"
source_name: Name of the source (e.g., "google-ads", "facebook-marketing")
configuration: OAuth configuration specific to the source
"""
if not self.access_token:
raise ValueError("Access token not available. Call create_access_token() first.")
url = f"{self.base_url}/organizations/{organization_id}/oauthCredentials"
payload = {
"actorType": actor_type,
"name": source_name,
"configuration": {
"credentials": configuration
}
}
print(f"Step 2: Creating/updating OAuth credentials for {source_name}...")
print(f"πŸ” OAuth payload: {json.dumps(payload, indent=2)}")
response = self.session.put(url, json=payload)
if response.status_code == 200:
print(f"βœ… OAuth credentials updated successfully for {source_name}")
return response.json() if response.text else {"status": "success"}
else:
print(f"❌ Failed to update OAuth credentials: {response.status_code}")
print(f"Response: {response.text}")
if response.status_code == 403:
print("⚠️ 403 Forbidden - This could be due to:")
print(" - Invalid API key")
print(" - Insufficient permissions for the organization")
print(" - Organization ID not accessible with current credentials")
response.raise_for_status()
def initiate_oauth(self,
redirect_url: str,
workspace_id: str,
source_type: str,
oauth_input_configuration: Optional[str] = None) -> Dict[str, Any]:
"""
Step 3: Initiate OAuth for a source
Args:
redirect_url: URL to redirect after OAuth completion
workspace_id: The workspace ID to create the secret
source_type: Type of source (e.g., "google-ads", "facebook-marketing")
oauth_input_configuration: Optional OAuth input configuration
"""
if not self.access_token:
raise ValueError("Access token not available. Call create_access_token() first.")
url = f"{self.base_url}/sources/initiateOAuth"
payload = {
"redirectUrl": redirect_url,
"workspaceId": workspace_id,
"sourceType": source_type
}
if oauth_input_configuration:
payload["oAuthInputConfiguration"] = oauth_input_configuration
print(f"Step 3: Initiating OAuth for {source_type}...")
response = self.session.post(url, json=payload)
if response.status_code == 200:
oauth_data = response.json()
print(f"βœ… OAuth initiated successfully for {source_type}")
print(f"Redirect URL: {oauth_data.get('redirect_url', 'Not provided')}")
return oauth_data
else:
print(f"❌ Failed to initiate OAuth: {response.status_code}")
print(f"Response: {response.text}")
response.raise_for_status()
def run_complete_flow(self,
organization_id: str,
workspace_id: str,
source_type: str,
redirect_url: str,
oauth_configuration: Dict[str, Any],
oauth_input_configuration: Optional[str] = None) -> Dict[str, Any]:
"""
Run the complete OAuth flow sequence
"""
try:
print("πŸš€ Starting Airbyte OAuth Flow...")
print("=" * 50)
# Step 1: Create access token
token_data = self.create_access_token()
print("\n" + "=" * 50)
# Step 2: Create/update OAuth credentials
credentials_data = self.create_or_update_oauth_credentials(
organization_id=organization_id,
actor_type="source",
source_name=source_type,
configuration=oauth_configuration
)
print("\n" + "=" * 50)
# Step 3: Initiate OAuth
oauth_data = self.initiate_oauth(
redirect_url=redirect_url,
workspace_id=workspace_id,
source_type=source_type,
oauth_input_configuration=oauth_input_configuration
)
print("\n" + "=" * 50)
print("βœ… Complete OAuth flow finished successfully!")
return {
"token_data": token_data,
"credentials_data": credentials_data,
"oauth_data": oauth_data
}
except requests.exceptions.RequestException as e:
print(f"❌ Request failed: {e}")
raise
except Exception as e:
print(f"❌ Unexpected error: {e}")
raise
def main():
"""
Example usage of the AirbyteOAuthFlow class
"""
# Configuration - Replace with your actual values
CLIENT_ID = os.getenv("AIRBYTE_CLIENT_ID", "your_client_id_here")
CLIENT_SECRET = os.getenv("AIRBYTE_CLIENT_SECRET", "your_client_secret_here")
ORGANIZATION_ID = os.getenv("AIRBYTE_ORGANIZATION_ID", "your_organization_id_here")
WORKSPACE_ID = os.getenv("AIRBYTE_WORKSPACE_ID", "your_workspace_id_here")
# OAuth flow configuration
SOURCE_TYPE = "github" # Change to your desired source type
REDIRECT_URL = "https://localhost/oauth/callback" # Your OAuth callback URL
# Example OAuth configuration for GitHub
# This will vary depending on the source type
OAUTH_CONFIGURATION = {
"client_id": "your_github_oauth_client_id",
"client_secret": "your_github_oauth_client_secret"
}
# Initialize the OAuth flow
oauth_flow = AirbyteOAuthFlow(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET
)
try:
# Run the complete flow
result = oauth_flow.run_complete_flow(
organization_id=ORGANIZATION_ID,
workspace_id=WORKSPACE_ID,
source_type=SOURCE_TYPE,
redirect_url=REDIRECT_URL,
oauth_configuration=OAUTH_CONFIGURATION
)
print("\nπŸŽ‰ OAuth flow completed successfully!")
print("You can now redirect users to the OAuth URL to complete authentication.")
except Exception as e:
print(f"\nπŸ’₯ OAuth flow failed: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())
requests>=2.31.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment