-
-
Save datawithdev/81d146d2c4cde9ea3b825b3e0eef6a91 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
import requests | |
API_ROOT = 'http://localhost:8000/api/v1' # This is the default for Airbyte | |
def get_workspaces() -> List[str]: | |
response = requests.post(f'{API_ROOT}/workspaces/list') | |
response.raise_for_status() # Either handle this yourself, or use a tool like Sentry for logging | |
return [ | |
workspace['workspaceId'] | |
for workspace in repsonse.json()['workspaces'] | |
] | |
def get_connections_for_workspace(workspace_id: str) -> List[str]: | |
response = requests.post( | |
f'{API_ROOT}/connections/list', | |
json={'workspaceId': workspace_id}, | |
) | |
response.raise_for_status() | |
return [ | |
connection['connectionId'] | |
for connection in response.json()['connections'] | |
if connection['status'] == 'active' # So we can still disable connections in the UI | |
] | |
def trigger_connection_sync(connection_id: str) -> dict: | |
response = requests.post( | |
f'{API_ROOT}/connections/sync', | |
json={'connectionId': connection_id}, | |
) | |
response.raise_for_status() | |
return response.json() | |
if __name__ == '__main__': | |
workspaces = get_workspaces() | |
connections = [] | |
for workspace_id in workspaces: | |
connections.extend(get_connections_for_workspace(workspace_id)) | |
for connection_id in connections: | |
trigger_connection_sync(connection_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment