Last active
September 7, 2021 13:03
-
-
Save alexbasista/1c76865d4db2fad1c740eaaa0bb06808 to your computer and use it in GitHub Desktop.
Examples of interacting with Terraform Cloud/Enterprise API via Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import requests | |
def create_team(hostname, org, name, **kwargs): | |
""" | |
POST /organizations/:organization_name/teams | |
""" | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
teams_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'teams']) | |
payload = {} | |
data = {} | |
data['type'] = 'teams' | |
attributes = {} | |
attributes['name'] = name | |
if kwargs.get('organization_access'): | |
if not isinstance(kwargs.get('organization_access'), dict): | |
print("ERROR: 'organization_access' argument must be of type dict.") | |
sys.exit() | |
else: | |
attributes['organization-access'] = kwargs.get('organization_access') | |
if kwargs.get('visibility'): | |
attributes['visibility'] = kwargs.get('visibility') | |
data['attributes'] = attributes | |
payload['data'] = data | |
r = requests.post(url=teams_endpoint, headers=header, data=json.dumps(payload)) | |
r.raise_for_status() | |
return r | |
def create_oauth_client(hostname, org, name, service_provider, http_url, api_url, oauth_token_string, **kwargs): | |
""" | |
POST /organizations/:organization_name/oauth-clients | |
""" | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
oc_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'oauth-clients']) | |
payload = {} | |
data = {} | |
data['type'] = 'oauth-clients' | |
attributes = {} | |
attributes['service-provider'] = service_provider | |
attributes['name'] = name | |
attributes['http-url'] = http_url | |
attributes['api-url'] = api_url | |
attributes['oauth-token-string'] = oauth_token_string | |
if kwargs.get('private_key'): | |
attributes['private-key'] = kwargs.get('private_key') | |
data['attributes'] = attributes | |
payload['data'] = data | |
r = requests.post(url=oc_endpoint, headers=header, data=json.dumps(payload)) | |
r.raise_for_status() | |
return r | |
def create_org(hostname, name, email, **kwargs): | |
""" | |
POST /organizations | |
""" | |
org_attributes_list = [ | |
'name', | |
'email', | |
'session_timeout', | |
'session_remember', | |
'collaborator_auth_policy', | |
'cost_estimation_enabled', | |
'owners_team_saml_role_id' | |
] | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
org_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations']) | |
payload = {} | |
data = {} | |
data['type'] = 'organizations' | |
attributes = {} | |
attributes['name'] = name | |
attributes['email'] = email | |
for key, value in kwargs.items(): | |
if key in org_attributes_list: | |
attributes[key] = value | |
else: | |
print("WARNING: '{}' is an invalid key for Organizations API.".format(key)) | |
data['attributes'] = attributes | |
payload['data'] = data | |
r = requests.post(url=org_endpoint, headers=header, data=json.dumps(payload)) | |
return r |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import requests | |
def _get_oauth_token_id(hostname, header, org, oauth_client_name): | |
""" | |
GET /organizations/:organization_name/oauth-clients | |
""" | |
oc_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'oauth-clients']) | |
r = requests.get(url=oc_endpoint, headers=header) | |
r.raise_for_status() | |
oc = [ i for i in r.json()['data'] if i['attributes']['name'] == oauth_client_name ] | |
return oc[0]['relationships']['oauth-tokens']['data'][0]['id'] | |
def create_policy_set(hostname, org, name, **kwargs): | |
""" | |
POST /organizations/:organization_name/policy-sets | |
""" | |
ps_attributes_list = [ | |
'name', | |
'description', | |
'global', | |
'policies_path' | |
] | |
ps_vcs_repo_attributes_list = [ | |
'branch', | |
'identifier', | |
'ingress_submodules', | |
'oauth_token_id' | |
] | |
ps_relationships_list = [ | |
'workspace_ids' | |
] | |
ps_kwargs_non_params = [ | |
'oauth_client_name' | |
] | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
ps_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'policy-sets']) | |
payload = {} | |
data = {} | |
data['type'] = 'policy-sets' | |
attributes = {} | |
vcs_repo = {} | |
attributes['name'] = name | |
for key, value in kwargs.items(): | |
if key in ps_attributes_list: | |
attributes[key] = value | |
elif key in ps_vcs_repo_attributes_list: | |
vcs_repo[key] = value | |
elif key in ps_relationships_list: | |
pass | |
elif key in ps_kwargs_non_params: | |
pass | |
else: | |
print("WARNING: '{}' is an invalid key for Policy Sets API".format(key)) | |
if len(vcs_repo) > 0: | |
if not kwargs.get('oauth_token_id'): | |
if not kwargs.get('oauth_client_name'): | |
print("ERROR: A value is required for either 'oauth_token_id' or 'oauth_client_name' if a VCS repo identifier is specified.") | |
sys.exit() | |
else: | |
ot_id = _get_oauth_token_id(hostname=hostname, header=header, org=org, oauth_client_name=kwargs.get('oauth_client_name')) | |
vcs_repo['oauth-token-id'] = ot_id | |
else: | |
ot_id = kwargs.get('oauth_token_id') | |
vcs_repo['oauth-token-id'] = ot_id | |
attributes['vcs-repo'] = vcs_repo | |
if kwargs.get('workspace_ids'): | |
relationships = {} | |
workspaces = {} | |
workspaces_data = [] | |
for ws in kwargs.get('workspace_ids'): | |
obj = { 'id': ws, 'type': 'workspaces' } | |
workspaces_data.append(obj) | |
workspaces['data'] = workspaces_data | |
relationships['workspaces'] = workspaces | |
data['relationships'] = relationships | |
data['attributes'] = attributes | |
payload['data'] = data | |
r = requests.post(url=ps_endpoint, headers=header, data=json.dumps(payload)) | |
r.raise_for_status() | |
return r |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import requests | |
def _get_workspace_id(hostname, header, org, name): | |
""" | |
GET /workspaces/:workspace_id | |
""" | |
ws_endpoint = '/'.join(['https:/', hostname, 'organizations', org, 'workspaces', name]) | |
r = requests.get(url=ws_endpoint, headers=header) | |
return r.json()['data']['id'] | |
def _lock_workspace(hostname, header, ws_id, reason='Locked by Python script.'): | |
""" | |
POST /workspaces/:workspace_id/actions/lock | |
""" | |
ws_lock_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'workspaces', ws_id, 'actions', 'lock']) | |
payload = { "reason": reason } | |
r = requests.post(url=ws_lock_endpoint, headers=header, data=json.dumps(payload)) | |
return r | |
def _unlock_workspace(hostname, header, ws_id): | |
""" | |
POST /workspaces/:workspace_id/actions/unlock | |
""" | |
ws_unlock_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'workspaces', ws_id, 'actions', 'unlock']) | |
r = requests.post(url=ws_unlock_endpoint, headers=header) | |
return r | |
def create_state_version(hostname, org, ws, serial, md5, state, **kwargs): | |
""" | |
POST /workspaces/:workspace_id/state-versions | |
""" | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
ws_id = _get_workspace_id(hostname=hostname, header=header, org=org, name=ws) | |
sv_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'workspaces', ws_id, 'state-versions']) | |
payload = {} | |
data = {} | |
data['type'] = 'state-versions' | |
attributes = {} | |
attributes['serial'] = serial | |
attributes['md5'] = md5 | |
attributes['state'] = state | |
if kwargs.get('lineage'): | |
attributes['lineage'] = kwargs.get('lineage') | |
data['attributes'] = attributes | |
payload['data'] = data | |
r = requests.post(url=sv_endpoint, headers=header, data=json.dumps(payload)) | |
r.raise_for_status() | |
return r |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import requests | |
def _does_workspace_exist(hostname, header, org, name): | |
""" | |
GET /organizations/:organization_name/workspaces/:name | |
""" | |
ws_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'workspaces', name]) | |
r = requests.get(url=ws_endpoint, headers=header) | |
if r.status_code == 404: | |
return False | |
elif r.status_code == 200: | |
return True | |
else: | |
print('ERROR: Received unexpected reponse code: {}'.format(r.status_code)) | |
return True | |
def create_workspace(hostname, org, name, **kwargs): | |
""" | |
POST /organizations/:organization_name/workspaces | |
""" | |
ws_attributes_list = [ | |
'name', | |
'agent_pool_id', | |
'allow_destroy_plan', | |
'auto_apply', | |
'description', | |
'execution_mode', | |
'file_triggers_enabled', | |
'source_name', | |
'source_url', | |
'queue_all_runs', | |
'speculative_enabled', | |
'terraform_version', | |
'trigger_prefixes' | |
'working_directory', | |
] | |
ws_vcs_repo_attributes_list = [ | |
'oauth_token_id', | |
'branch', | |
'default_branch', | |
'ingress_submodules', | |
'identifier' | |
] | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print("ERROR: Missing API token.") | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
ws_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'workspaces']) | |
payload = {} | |
data = {} | |
data['type'] = 'workspaces' | |
attributes = {} | |
vcs_repo = {} | |
attributes['name'] = name | |
for key, value in kwargs.items(): | |
if key in ws_attributes_list: | |
attributes[key] = value | |
elif key in ws_vcs_repo_attributes_list: | |
vcs_repo[key] = value | |
else: | |
print("WARNING: '{}' is an invalid key for Workspaces API".format(key)) | |
if len(vcs_repo) > 0: | |
attributes['vcs-repo'] = vcs_repo | |
data['attributes'] = attributes | |
payload['data'] = data | |
if _does_workspace_exist(hostname=hostname, header=header, org=org, name=name): | |
print("ERROR: Workspace '{}' already exists".format(name)) | |
sys.exit() | |
r = requests.post(url=ws_endpoint, headers=header, data=json.dumps(payload)) | |
r.raise_for_status() | |
return r |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import requests | |
def list_orgs(hostname, **kwargs): | |
""" | |
GET /organizations | |
""" | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print("ERROR: Missing API token.") | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
org_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations']) | |
r = requests.get(url=org_endpoint, headers=header) | |
r.raise_for_status() | |
return [ i['id'] for i in r.json()['data'] ] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import requests | |
def _get_ps_id(hostname, header, org, name): | |
""" | |
GET /organizations/:organization_name/policy-sets | |
""" | |
ps_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'policy-sets']) | |
r = requests.get(url=ps_endpoint, headers=header) | |
ps_id = [ i['id'] for i in r.json()['data'] if i['attributes']['name'] == name ] | |
return ps_id[0] | |
def _get_psp_id(hostname, header, org, policy_set, parameter): | |
""" | |
GET /policy-sets/:policy_set_id/parameters | |
""" | |
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set) | |
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters']) | |
r = requests.get(url=psp_endpoint, headers=header) | |
psp_id = [ i['id'] for i in r.json()['data'] if i['attributes']['key'] == parameter ] | |
return psp_id[0] | |
def create(hostname, org, policy_set, key, value, sensitive=False, **kwargs): | |
""" | |
POST /policy-sets/:policy_set_id/parameters | |
""" | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set) | |
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters']) | |
payload = {} | |
data = {} | |
data['type'] = 'vars' | |
attributes = {} | |
attributes['key'] = key | |
attributes['value'] = value | |
attributes['category'] = 'policy-set' | |
attributes['sensitive'] = sensitive | |
data['attributes'] = attributes | |
payload['data'] = data | |
r = requests.post(url=psp_endpoint, headers=header, data=json.dumps(payload)) | |
r.raise_for_status() | |
return r | |
def list(hostname, org, policy_set, **kwargs): | |
""" | |
GET /policy-sets/:policy_set_id/parameters | |
""" | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set) | |
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters']) | |
r = requests.get(url=psp_endpoint, headers=header) | |
r.raise_for_status() | |
return r | |
def update(hostname, org, policy_set, parameter, **kwargs): | |
""" | |
PATCH /policy-sets/:policy_set_id/parameters/:parameter_id | |
""" | |
psp_attributes_list = [ | |
'key', | |
'value', | |
'sensitive' | |
] | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set) | |
psp_id = _get_psp_id(hostname=hostname, header=header, org=org, policy_set=policy_set, parameter=parameter) | |
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters', psp_id]) | |
payload = {} | |
data = {} | |
data['type'] = 'vars' | |
data['id'] = psp_id | |
attributes = {} | |
for key, value in kwargs.items(): | |
if key in psp_attributes_list: | |
attributes[key] = value | |
else: | |
print("ERROR: '{}' is an invalid key for Policy Set Parameters API".format(key)) | |
attributes['category'] = 'policy-set' | |
data['attributes'] = attributes | |
payload['data'] = data | |
r = requests.patch(url=psp_endpoint, headers=header, data=json.dumps(payload)) | |
r.raise_for_status() | |
return r | |
def delete(hostname, org, policy_set, parameter, **kwargs): | |
""" | |
DELETE /policy-sets/:policy_set_id/parameters/:parameter_id | |
""" | |
if kwargs.get('tfe_token'): | |
token = kwargs.get('tfe_token') | |
kwargs.pop('tfe_token') | |
elif os.getenv('TFE_TOKEN'): | |
token = os.getenv('TFE_TOKEN') | |
else: | |
print('ERROR: Missing API token.') | |
sys.exit() | |
header = { | |
'Authorization': 'Bearer ' + token, | |
'Content-Type': 'application/vnd.api+json' | |
} | |
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set) | |
psp_id = _get_psp_id(hostname=hostname, header=header, org=org, policy_set=policy_set, parameter=parameter) | |
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters', psp_id]) | |
r = requests.delete(url=psp_endpoint, headers=header) | |
r.raise_for_status() | |
return r |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment