Created
October 8, 2020 08:13
-
-
Save anax32/501419330e77f72680567c70629d1aa0 to your computer and use it in GitHub Desktop.
Attempt to use the Azure Devops REST API to re-trigger a release.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
list and trigger releases from azure devops | |
API: | |
https://docs.microsoft.com/en-us/rest/api/azure/devops/?view=azure-devops-rest-6.0 | |
https://docs.microsoft.com/en-us/rest/api/azure/devops/release/releases/create?view=azure-devops-rest-6.0 | |
pypi: | |
https://github.com/microsoft/azure-devops-python-api | |
serialization package: | |
https://azuresdkdocs.blob.core.windows.net/$web/python/azure-appconfiguration/1.0.0b5/_modules/msrest/serialization.html | |
""" | |
import os | |
import sys | |
import logging | |
import json | |
from azure.devops.connection import Connection | |
from msrest.authentication import BasicAuthentication | |
logger = logging.getLogger() | |
logger.setLevel(os.getenv("LOGLEVEL", "WARN").upper()) | |
handler = logging.StreamHandler(sys.stdout) | |
#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
#handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
PROJECT_NAME = os.getenv("PROJECT_NAME", "default") | |
def list_projects(connection): | |
""" | |
list the projects to which we have access | |
""" | |
core_client = connection.clients.get_core_client() | |
# Get the first page of projects | |
get_projects_response = core_client.get_projects() | |
index = 0 | |
while get_projects_response is not None: | |
for project in get_projects_response.value: | |
print("project[%03i]: '%s'" % (index, project.name)) | |
index += 1 | |
if get_projects_response.continuation_token is not None and get_projects_response.continuation_token != "": | |
# Get the next page of projects | |
get_projects_response = core_client.get_projects(continuation_token=get_projects_response.continuation_token) | |
else: | |
# All projects have been retrieved | |
get_projects_response = None | |
def list_releases(connection): | |
""" | |
list the releases | |
""" | |
# Get a client (the "core" client provides access to projects, teams, etc) | |
release_client = connection.clients.get_release_client() | |
R = release_client.get_releases() | |
Rj = [x.as_dict() for x in R.value] | |
print(json.dumps(Rj, indent=2)) | |
def repeat_last_release(connection, service_name, environments): | |
""" | |
get a the last successful release, | |
get the environment, | |
update the environment status variable to 'in progress' | |
profit | |
""" | |
# Get a client (the "core" client provides access to projects, teams, etc) | |
release_client = connection.clients_v6_0.get_release_client() | |
R = release_client.get_release_definitions(PROJECT_NAME, search_text=service_name) | |
assert R is not None | |
assert len(R) == 1 | |
definition_id = R[0].id # assume first release-definition is our one | |
print("'%s' definition id: %03i" % (service_name, definition_id)) | |
# logger.debug(json.dumps([x.as_dict() for x in R])) | |
print("DEFINITIONS---") | |
print(json.dumps(R[0].as_dict(), indent=2)) | |
print("---") | |
# get the list of releases for this release definition | |
R = release_client.get_releases( | |
project=PROJECT_NAME, | |
definition_id = definition_id, | |
status_filter="active" | |
) | |
assert R is not None | |
print("RELEASES---") | |
print(json.dumps(R[0].as_dict(), indent=2)) | |
print("---") | |
Renvs = {k: x for x in R[0].environments if x.name == k for k in environments} | |
print("release environments:'%s'" % json.dumps(Renvs, indent=2)) | |
# get the release object | |
R = release_client.get_release_environment( | |
project=PROJECT_NAME, | |
release_id=release_id, | |
environment_id=release_environment_id | |
) | |
assert R is not None | |
print("count: %i" % len(R)) | |
print("ENVIRONMENTS---") | |
print(json.dumps(R[0].as_dict(), indent=2)) | |
print("---") | |
# PATCH | |
print("patch") | |
url = "https://vsrm.dev.azure.com/blaise-gcp/%s/_apis/Release/releases/%i/environments/%i?api-version=6.0" % (PROJECT_NAME, release_id, release_environment_id) | |
print("url") | |
if __name__ == "__main__": | |
# Fill in with your personal access token and org URL | |
personal_access_token = os.environ["AZURE_PAT"] | |
organization_url = os.environ["AZURE_DEVOPS_ORG_URL"] | |
# Create a connection to the org | |
credentials = BasicAuthentication('', personal_access_token) | |
connection = Connection(base_url=organization_url, creds=credentials) | |
# list some stuff | |
list_projects(connection) | |
repeat_last_release( | |
connection, | |
service_name="BlaiseCaseHandler", | |
environments=["dev"] | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment