Created
June 21, 2019 19:00
-
-
Save epicserve/167b35f2d19da368c849f522075a8121 to your computer and use it in GitHub Desktop.
Example on how to get Bitbucket Pipelines build data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from oauthlib.oauth2 import BackendApplicationClient | |
from requests_oauthlib import OAuth2Session | |
""" | |
In order to use this class you're have to setup an API token first by going to https://bitbucket.org/account/user/oeie/api. | |
And then add an OAuth consumer. Please note this will not work if you don't provide Callback URL when creating your | |
consumer. I just provided a generic URL since we don't need callbacks. The documentation for the bitbucket API is | |
located here, https://developer.atlassian.com/bitbucket/api/2/reference/. | |
Example usage: | |
bitbucket = Bitbucket(os.environ['BITBUCKET_KEY'], os.environ['BITBUCKET_SECRET'], '<workplace>', '<repo_slug>') | |
bitbucket.get_pipeline_build_state('master') | |
""" | |
class Bitbucket(object): | |
def __init__(self, client_id, client_secret, workplace, repo_slug): | |
self.workplace = workplace | |
self.repo_slug = repo_slug | |
self.token_url = 'https://bitbucket.org/site/oauth2/access_token' | |
self.api_url = 'https://api.bitbucket.org/2.0/' | |
self.max_pages = 10 | |
self.client = BackendApplicationClient(client_id=client_id) | |
self.oauth = OAuth2Session(client=self.client) | |
self.oauth.fetch_token( | |
token_url=self.token_url, | |
client_id=client_id, | |
client_secret=client_secret | |
) | |
def get_api_url(self, url): | |
return f'{self.api_url}repositories/{self.workplace}/{self.repo_slug}/{url}' | |
@staticmethod | |
def filter_pipelines_by_branch(pipelines, branch): | |
pipeline_builds = [] | |
for pipeline in pipelines: | |
pl_branch = pipeline['target']['ref_name'] | |
if pl_branch == branch: | |
pipeline_builds.append(pipeline) | |
return pipeline_builds | |
def get_pipelines(self, page=1): | |
url = f"{self.get_api_url('pipelines/')}?page={page}&size=100&sort=-created_on" | |
data = self.oauth.get(url).json() | |
return data['values'] | |
def get_latest_branch_build_data(self, branch): | |
for i in range(1, self.max_pages + 1): | |
print('page', i) | |
pipelines = self.get_pipelines(i) | |
builds = self.filter_pipelines_by_branch(pipelines, branch) | |
if builds: | |
return builds[0] | |
raise Exception(f"Couldn't find any builds for the {branch} branch") | |
def get_pipeline_build_state(self, branch): | |
return self.get_latest_branch_build_data(branch) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment