Skip to content

Instantly share code, notes, and snippets.

@optilude
Created August 6, 2015 22:18
Show Gist options
  • Save optilude/bbdd7901fc0deb792d17 to your computer and use it in GitHub Desktop.
Save optilude/bbdd7901fc0deb792d17 to your computer and use it in GitHub Desktop.
jiraquery.py
# query.py
import itertools
import datetime
import dateutil.parser
import dateutil.tz
def to_datetime(date):
"""Turn a date into a datetime at midnight.
"""
return datetime.datetime.combine(date, datetime.datetime.min.time())
def strip_time(datetime):
"""Return a version of the datetime with time set to zero.
"""
return to_datetime(datetime.date())
class IssueSnapshot(object):
"""A snapshot of the key fields of an issue at a point in its change history
"""
def __init__(self, change, key, date, status, resolution, is_resolved):
self.change = change
self.key = key
self.date = date.astimezone(dateutil.tz.tzutc())
self.status = status
self.resolution = resolution
self.is_resolved = is_resolved
def __hash__(self):
return hash(self.key)
def __repr__(self):
return "<IssueSnapshot change=%s key=%s date=%s status=%s resolution=%s is_resolved=%s>" % (
self.change, self.key, self.date.isoformat(), self.status, self.resolution, self.is_resolved
)
class QueryManager(object):
"""Manage and execute queries
"""
settings = dict(
project=None,
issue_types=['Story'],
valid_resolutions=["Done", "Wontfix"],
epics=None,
jql_filter=None,
epic_link_field='Epic Link',
release_field='Fix Version/s',
size_field='Story Points',
rank_field='Rank',
team_field='Team',
max_results=1000,
)
fields = dict(
epic_link=None,
release=None,
size=None,
rank=None,
team=None,
)
def __init__(self, jira, **kwargs):
self.jira = jira
settings = self.settings.copy()
settings.update(kwargs)
self.settings = settings
self.resolve_fields()
# Helpers
def resolve_fields(self):
fields = self.jira.fields()
for k in self.fields.keys():
name = self.settings['%s_field' % k]
self.fields[k] = next((f['id'] for f in fields if f['name'] == name))
def iter_changes(self, issue, include_resolution_changes=True):
"""Yield an IssueSnapshot for each time the issue changed status or
resolution
"""
is_resolved = False
# Find the first status change, if any
status_changes = filter(
lambda h: h.field == 'status',
itertools.chain.from_iterable([c.items for c in issue.changelog.histories])
)
last_status = status_changes[0].fromString if len(status_changes) > 0 else issue.fields.status.name
last_resolution = None
# Issue was created
yield IssueSnapshot(
change=None,
key=issue.key,
date=dateutil.parser.parse(issue.fields.created),
status=last_status,
resolution=None,
is_resolved=is_resolved
)
for change in issue.changelog.histories:
change_date = dateutil.parser.parse(change.created)
resolutions = filter(lambda i: i.field == 'resolution', change.items)
is_resolved = (resolutions[-1].to is not None) if len(resolutions) > 0 else is_resolved
for item in change.items:
if item.field == 'status':
# Status was changed
last_status = item.toString
yield IssueSnapshot(
change=item.field,
key=issue.key,
date=change_date,
status=last_status,
resolution=last_resolution,
is_resolved=is_resolved
)
elif item.field == 'resolution':
last_resolution = item.toString
if include_resolution_changes:
yield IssueSnapshot(
change=item.field,
key=issue.key,
date=change_date,
status=last_status,
resolution=last_resolution,
is_resolved=is_resolved
)
# Basic queries
def find_issues(self, jql=None, epics=None, order='KEY ASC'):
"""Return a list of issues with changelog metadata.
Searches for the `issue_types`, `project` and `valid_resolutions`
set in the settings for the query manager.
Pass a JQL string to further qualify the query results.
Pass a list of epics to search by epic link.
"""
query = []
query.append('issueType IN (%s)' % ', '.join(['"%s"' % t for t in self.settings['issue_types']]))
query.append('(resolution IS EMPTY OR resolution IN (%s))' % ', '.join(['"%s"' % r for r in self.settings['valid_resolutions']]))
if self.settings['project']:
query.append('project = %s' % self.settings['project'])
if self.settings['jql_filter'] is not None:
query.append('(%s)' % self.settings['jql_filter'])
if self.settings['epics'] is not None:
query.append('%s in (%s)' % (self.settings['epic_link_field'], ', '.join([f.key for f in self.settings['epics']]),))
if jql is not None:
query.append('(%s)' % jql)
if epics is not None:
query.append('%s in (%s)' % (self.settings['epic_link_field'], ', '.join([f.key for f in epics]),))
queryString = "%s ORDER BY %s" % (' AND '.join(query), order,)
return self.jira.search_issues(queryString, expand='changelog', maxResults=self.settings['max_results'])
# cycletime.py
import pandas as pd
import numpy as np
class StatusTypes:
backlog = 'backlog'
accepted = 'accepted'
complete = 'complete'
class CycleTimeQueries(QueryManager):
"""Analysis for cycle time data, producing cumulative flow diagrams,
scatter plots and histograms.
Initialise with a `cycle`, a list of dicts representing the steps in
a cycle. Each dict describes that step with keys `name`, `type` (one of
"backlog", "accepted" or "complete" as per the `StatusTypes` enum) and
`statuses` (a list of equivalent JIRA workflow statuses that map onto
this step).
"""
settings = dict(
cycle=[ # flow steps, types, and mapped JIRA statuses
{
"name": 'todo',
"type": StatusTypes.backlog,
"statuses": ["Open", "To Do"],
},
{
"name": 'analysis',
"type": StatusTypes.accepted,
"statuses": ["Analysis"],
},
{
"name": 'analysis-done',
"type": StatusTypes.accepted,
"statuses": ["Analysis Done"],
},
{
"name": 'development',
"type": StatusTypes.accepted,
"statuses": ["In Progress"],
},
{
"name": 'done',
"type": StatusTypes.complete,
"statuses": ["Done", "Closed"],
},
]
)
def __init__(self, jira, **kwargs):
settings = super(CycleTimeQueries, self).settings.copy()
settings.update(self.settings.copy())
settings.update(kwargs)
settings['cycle_lookup'] = {}
for idx, cycle_step in enumerate(settings['cycle']):
for status in cycle_step['statuses']:
settings['cycle_lookup'][status] = dict(
index=idx,
name=cycle_step['name'],
type=cycle_step['type'],
)
super(CycleTimeQueries, self).__init__(jira, **settings)
def cycle_data(self):
"""Build a numberically indexed data frame with the following 'fixed'
columns: `key`, 'url', 'issue_type', `summary`, `status`, `resolution`,
`size`, `release`, and `rank` from JIRA.
In addition, `cycle_time` will be set to the time delta between the
first `accepted`-type column and the first `complete` column, or None.
The remaining columns are the names of the items in the configured
cycle, in order.
Each cell contains the last date/time stamp when the relevant status
was set.
If an item moves backwards through the cycle, subsequent date/time
stamps in the cycle are erased.
"""
data = []
cycle_names = [s['name'] for s in self.settings['cycle']]
accepted_steps = set(s['name'] for s in self.settings['cycle'] if s['type'] == StatusTypes.accepted)
completed_steps = set(s['name'] for s in self.settings['cycle'] if s['type'] == StatusTypes.complete)
for issue in self.find_issues():
size = getattr(issue.fields, self.fields['size'], None)
release = getattr(issue.fields, self.fields['release'], None)
rank = getattr(issue.fields, self.fields['rank'], None)
team = getattr(issue.fields, self.fields['team'], None)
item = {
'key': issue.key,
'url': "%s/browse/%s" % (self.jira._options['server'], issue.key,),
'issue_type': issue.fields.issuetype.name,
'summary': issue.fields.summary,
'status': issue.fields.status.name,
'resolution': issue.fields.resolution.name if issue.fields.resolution else None,
'size': size.value if size else None,
'release': release[0].name if release else None,
'team': team.value if team else None,
'rank': rank,
'cycle_time': None,
'completed_timestamp': None
}
for cycle_name in cycle_names:
item[cycle_name] = None
# Record date of status changes
for snapshot in self.iter_changes(issue, False):
cycle_step = self.settings['cycle_lookup'].get(snapshot.status, None)
if cycle_step is None:
continue
item[cycle_step['name']] = snapshot.date
# Wipe timestamps if items have moved backwards; calculate cycle time
previous_timestamp = None
accepted_timestamp = None
completed_timestamp = None
for cycle_name in cycle_names:
if (
item[cycle_name] is not None and
previous_timestamp is not None and
item[cycle_name] < previous_timestamp
):
item[cycle_name] = None
if item[cycle_name] is not None:
previous_timestamp = item[cycle_name]
if accepted_timestamp is None and previous_timestamp is not None and cycle_name in accepted_steps:
accepted_timestamp = previous_timestamp
if completed_timestamp is None and previous_timestamp is not None and cycle_name in completed_steps:
completed_timestamp = previous_timestamp
if accepted_timestamp is not None and completed_timestamp is not None:
item['cycle_time'] = completed_timestamp - accepted_timestamp
item['completed_timestamp'] = completed_timestamp
data.append(item)
return pd.DataFrame(data, columns=['key', 'url', 'issue_type', 'summary', 'status', 'resolution', 'size', 'team', 'release', 'rank', 'cycle_time', 'completed_timestamp'] + cycle_names)
def cfd(self, cycle_data):
"""Return the data to build a cumulative flow diagram: a DataFrame,
indexed by day, with columns containing cumulative counts for each
of the items in the configured cycle.
In addition, a column called `cycle_time` contains the approximate
average cycle time of that day based on the first "accepted" status
and the first "complete" status.
"""
cycle_names = [s['name'] for s in self.settings['cycle']]
cycle_start = next(s['name'] for s in self.settings['cycle'] if s['type'] == StatusTypes.accepted)
cycle_end = next(s['name'] for s in self.settings['cycle'] if s['type'] == StatusTypes.complete)
df = cycle_data[cycle_names]
df = pd.DataFrame(
np.array(df.values, dtype='<M8[ns]').astype('<M8[D]').astype('<M8[ns]'),
columns=df.columns,
index=df.index
)
df = pd.concat({col:df[col].value_counts() for col in df}, axis=1)
df = df.fillna(0).cumsum(axis=0)
start, end = df.index.min(), df.index.max()
df = df.reindex(pd.date_range(start, end, freq='D'), method='ffill')
df['cycle_time'] = df[cycle_end] - df[cycle_start]
return df
def histogram(self, cycle_data, bins=10):
"""Return histogram data for the cycle times in `cycle_data`. Returns
a dictionary with keys `bin_values` and `bin_edges` of numpy arrays
"""
hist = np.histogram(cycle_data['cycle_time'].astype('timedelta64[D]').dropna(), bins=bins)
return {
'bin_values': hist[0],
'bin_edges': hist[1]
}
def scatterplot(self, cycle_data, percentiles=(0.3, 0.5, 0.7, 0.85, 0.95,)):
"""Return scatterplot data for the cycle times in `cycle_data`.
Return a dictionary with keys `series` (a list of dicts with keys
`x`, `y` and the fields from each record in `cycle_data`) and
`percentiles` (a series with percentile values as keys).
"""
data = cycle_data.dropna(subset=['cycle_time', 'completed_timestamp']) \
.rename(columns={'cycle_time': 'y', 'completed_timestamp': 'x'})
data['y'] = data['y'].astype('timedelta64[D]')
return {
'series': data.to_dict('records'),
'percentiles': data['y'].quantile(percentiles)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment