Skip to content

Instantly share code, notes, and snippets.

@dpatlut
Created May 15, 2024 00:06
Show Gist options
  • Save dpatlut/2794fd819b8b76036a7e7fb5546b590a to your computer and use it in GitHub Desktop.
Save dpatlut/2794fd819b8b76036a7e7fb5546b590a to your computer and use it in GitHub Desktop.
Astro API Onboarding Script
"""
Creates a new workspace called MY_NEW_WORKSPACE_NAME in your organization
with 3 deployments:
- development
- staging
- production
"""
import os
import requests
# define the variables
API_TOKEN = os.getenv("ASTRO_ORG_API_TOKEN")
ORGANIZATION_ID = os.getenv("ASTRO_ORG_ID")
MY_NEW_WORKSPACE_NAME = "ad-astra-workspace"
MY_DEPLOYMENT_NAME_DEV = "development"
MY_DEPLOYMENT_NAME_STAGING = "staging"
MY_DEPLOYMENT_NAME_PROD = "production"
# create a new workspace
r_create_workspace = requests.post(
f"https://api.astronomer.io/platform/v1beta1/organizations/{ORGANIZATION_ID}/workspaces",
headers={"Authorization": f"Bearer {API_TOKEN}"},
json={"name": MY_NEW_WORKSPACE_NAME},
)
# get workspace ID
r_workspaces_list = requests.get(
f"https://api.astronomer.io/platform/v1beta1/organizations/{ORGANIZATION_ID}/workspaces",
headers={"Authorization": f"Bearer {API_TOKEN}"},
)
for workspace in r_workspaces_list.json()["workspaces"]:
if workspace["name"] == MY_NEW_WORKSPACE_NAME:
workspaceId = workspace["id"]
break
# define the deployment configurations
MY_DEV_DEPLOYMENT_CONFIG = {
"name": MY_DEPLOYMENT_NAME_DEV,
"astroRuntimeVersion": "10.5.0",
"cloudProvider": "AWS",
"defaultTaskPodCpu": "1",
"defaultTaskPodMemory": "2Gi",
"executor": "CELERY",
"isCiCdEnforced": False,
"isDagDeployEnabled": True,
"isDevelopmentMode": True,
"isHighAvailability": False,
"resourceQuotaCpu": "2",
"resourceQuotaMemory": "4Gi",
"schedulerSize": "SMALL",
"type": "STANDARD",
"workspaceId": workspaceId,
"region": "us-east-1",
"workerQueues": [
{
"astroMachine": "A5",
"isDefault": True,
"maxWorkerCount": 10,
"minWorkerCount": 0,
"name": "default",
"workerConcurrency": 10,
}
],
}
MY_STAGE_AND_PROD_DEPLOYMENT_CONFIG = {
"astroRuntimeVersion": "10.5.0",
"cloudProvider": "AWS",
"defaultTaskPodCpu": "1",
"defaultTaskPodMemory": "2Gi",
"executor": "CELERY",
"isCiCdEnforced": False,
"isDagDeployEnabled": True,
"isDevelopmentMode": False,
"isHighAvailability": False,
"resourceQuotaCpu": "2",
"resourceQuotaMemory": "4Gi",
"schedulerSize": "SMALL",
"type": "STANDARD",
"workspaceId": workspaceId,
"region": "us-east-1",
"workerQueues": [
{
"astroMachine": "A5",
"isDefault": True,
"maxWorkerCount": 10,
"minWorkerCount": 0,
"name": "default",
"workerConcurrency": 10,
}
],
}
# add the name of the deployment to the config
MY_STAGE_DEPLOYMENT_CONFIG = MY_STAGE_AND_PROD_DEPLOYMENT_CONFIG.copy()
MY_STAGE_DEPLOYMENT_CONFIG["name"] = MY_DEPLOYMENT_NAME_STAGING
MY_PROD_DEPLOYMENT_CONFIG = MY_STAGE_AND_PROD_DEPLOYMENT_CONFIG.copy()
MY_PROD_DEPLOYMENT_CONFIG["name"] = MY_DEPLOYMENT_NAME_PROD
# collect the deployments to create
DEPLOYMENTS_TO_CREATE = {
MY_DEPLOYMENT_NAME_DEV: MY_DEV_DEPLOYMENT_CONFIG,
MY_DEPLOYMENT_NAME_STAGING: MY_STAGE_DEPLOYMENT_CONFIG,
MY_DEPLOYMENT_NAME_PROD: MY_PROD_DEPLOYMENT_CONFIG,
}
# create the deployments
for deployment_name, deployment_config in DEPLOYMENTS_TO_CREATE.items():
r_create_deployment = requests.post(
f"https://api.astronomer.io/platform/v1beta1/organizations/{ORGANIZATION_ID}/deployments",
headers={"Authorization": f"Bearer {API_TOKEN}"},
json=deployment_config,
)
print(
f"""Success! Created the {MY_NEW_WORKSPACE_NAME} workspace with the following deployments:
{MY_DEPLOYMENT_NAME_DEV}, {MY_DEPLOYMENT_NAME_STAGING}, {MY_DEPLOYMENT_NAME_PROD}"""
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment