Skip to content

Instantly share code, notes, and snippets.

@epicserve
Created June 21, 2019 19:00
Show Gist options
  • Save epicserve/167b35f2d19da368c849f522075a8121 to your computer and use it in GitHub Desktop.
Save epicserve/167b35f2d19da368c849f522075a8121 to your computer and use it in GitHub Desktop.
Example on how to get Bitbucket Pipelines build data
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
"""
In order to use this class you're have to setup an API token first by going to https://bitbucket.org/account/user/oeie/api.
And then add an OAuth consumer. Please note this will not work if you don't provide Callback URL when creating your
consumer. I just provided a generic URL since we don't need callbacks. The documentation for the bitbucket API is
located here, https://developer.atlassian.com/bitbucket/api/2/reference/.
Example usage:
bitbucket = Bitbucket(os.environ['BITBUCKET_KEY'], os.environ['BITBUCKET_SECRET'], '<workplace>', '<repo_slug>')
bitbucket.get_pipeline_build_state('master')
"""
class Bitbucket(object):
def __init__(self, client_id, client_secret, workplace, repo_slug):
self.workplace = workplace
self.repo_slug = repo_slug
self.token_url = 'https://bitbucket.org/site/oauth2/access_token'
self.api_url = 'https://api.bitbucket.org/2.0/'
self.max_pages = 10
self.client = BackendApplicationClient(client_id=client_id)
self.oauth = OAuth2Session(client=self.client)
self.oauth.fetch_token(
token_url=self.token_url,
client_id=client_id,
client_secret=client_secret
)
def get_api_url(self, url):
return f'{self.api_url}repositories/{self.workplace}/{self.repo_slug}/{url}'
@staticmethod
def filter_pipelines_by_branch(pipelines, branch):
pipeline_builds = []
for pipeline in pipelines:
pl_branch = pipeline['target']['ref_name']
if pl_branch == branch:
pipeline_builds.append(pipeline)
return pipeline_builds
def get_pipelines(self, page=1):
url = f"{self.get_api_url('pipelines/')}?page={page}&size=100&sort=-created_on"
data = self.oauth.get(url).json()
return data['values']
def get_latest_branch_build_data(self, branch):
for i in range(1, self.max_pages + 1):
print('page', i)
pipelines = self.get_pipelines(i)
builds = self.filter_pipelines_by_branch(pipelines, branch)
if builds:
return builds[0]
raise Exception(f"Couldn't find any builds for the {branch} branch")
def get_pipeline_build_state(self, branch):
return self.get_latest_branch_build_data(branch)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment