Last active
August 29, 2015 14:16
-
-
Save mdcollins05/15211d8b32666b121eff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import requests | |
import sys | |
import json | |
from datetime import date, timedelta | |
import pprint | |
#Your PagerDuty API key. A read-only key will work for this. | |
auth_token = 'API_KEY_HERE' | |
#The PagerDuty subdomain | |
pd_subdomain = 'YOUR_PD_SUBDOMAIN' | |
#The PagerDuty team id (leave blank for all teams) | |
pd_team = '' | |
## NOTE: Dates should be in ISO 8601 format: http://en.wikipedia.org/wiki/ISO_8601 | |
#Start date | |
#Default value is: start_date = date.today().isoformat() | |
start_date = date.today().isoformat() | |
#End date | |
#Default falue is: end_date = (date.today() + timedelta(180)).isoformat() | |
end_date = (date.today() + timedelta(180)).isoformat() | |
#Include all users or just the ones attached to an escalation policy/schedule | |
inclusive = False | |
#Include the header in the CSV output | |
header = True | |
##Output columns to file 'on-call_users.csv': | |
##user name, team name, escalation policy name, escalation policy level, schedule name | |
HEADERS = { | |
'Authorization': 'Token token={0}'.format(auth_token), | |
'Content-type': 'application/json', | |
} | |
def get_users_and_escalation_policies(): | |
global pd_team | |
user_data = get_all_eps(pd_team) | |
user_data = convert_to_ascii(user_data) | |
print "Processing..." | |
if header: | |
output = '"Person\'s name", "Person\'s email", "Team name", "Escalation Policy name", "Escalation Policy level", "Schedule name (if applicable)"' + "\n" | |
else: | |
output = '' | |
# pprint.pprint(user_data) | |
# sys.exit() | |
for user in sorted(user_data.itervalues(), key=lambda e: e['name'].lower()): # Sort everything regardless of case | |
entries = 0 | |
if 'data' in user: | |
for team in sorted(user['data'], key=lambda e: e.lower()): | |
for ep_name, ep_value in sorted(user['data'][team].iteritems()): | |
entries += 1 | |
for ep_level, sched_names in sorted(ep_value.iteritems()): | |
for sched_name in sorted(sched_names, key=lambda e: e.lower()): | |
output += '"' + user['name'].replace('"', "''") + '","' + user['email'] + '","' + team.replace('"', "''") + '","' + ep_name.replace('"', "''") + '","' + str(ep_level) + '","' + sched_name.replace('"', "''") + '"' + "\n" | |
if entries == 0 and inclusive: | |
output += '"' + user['name'].replace('"', "'") + '","' + user['email'] + '","","","",""' + "\n" | |
print "Writing results to file: on_call_users.csv" | |
file = open('on_call_users.csv','a') | |
file.write(output) | |
def convert_to_ascii(input): | |
if isinstance(input, dict): | |
return {convert_to_ascii(key): convert_to_ascii(value) for key, value in input.iteritems()} | |
elif isinstance(input, list): | |
return [convert_to_ascii(element) for element in input] | |
elif isinstance(input, unicode): | |
return input.encode('ascii', 'ignore') | |
else: | |
return input | |
## User stuff ## | |
def get_user_count(): | |
user_count = requests.get( | |
'https://{0}.pagerduty.com/api/v1/users'.format(pd_subdomain), | |
headers=HEADERS, | |
) | |
return [user_count.json()['total'], user_count.json()['limit']] | |
def get_all_users(): | |
total_users = get_user_count() | |
print("Total number of users: {0}".format(total_users[0])) | |
users = {} | |
for offset in xrange(0, total_users[0]): | |
if offset % total_users[1] == 0: | |
user_data = get_users(offset, total_users[1]) | |
for user in user_data: | |
users[user['email']] = {} | |
users[user['email']]['name'] = user['name'] | |
users[user['email']]['email'] = user['email'] | |
return users | |
def get_users(offset, limit): | |
params = { | |
'offset': offset, | |
'limit': limit | |
} | |
users = requests.get( | |
'https://{0}.pagerduty.com/api/v1/users'.format(pd_subdomain), | |
headers=HEADERS, | |
data=json.dumps(params), | |
) | |
return users.json()['users'] | |
## Team stuff ## | |
def get_team_count(): | |
team_count = requests.get( | |
'https://{0}.pagerduty.com/api/v1/teams'.format(pd_subdomain), | |
headers=HEADERS, | |
) | |
return [team_count.json()['total'], team_count.json()['limit']] | |
def get_all_teams(): | |
total_teams = get_team_count() | |
print("Number of teams: {0}".format(total_teams[0])) | |
teams = {} | |
for offset in xrange(0, total_teams[0]): | |
if offset % total_teams[1] == 0: | |
team_data = get_teams(offset, total_teams[1]) | |
for team in team_data: | |
teams.setdefault(team['id'], team['name']) | |
return teams | |
def get_teams(offset, limit): | |
params = { | |
'offset': offset, | |
'limit': limit | |
} | |
teams = requests.get( | |
'https://{0}.pagerduty.com/api/v1/teams'.format(pd_subdomain), | |
headers=HEADERS, | |
data=json.dumps(params), | |
) | |
return teams.json()['teams'] | |
## Escalation Policy stuff ## | |
def get_ep_count(team): | |
params = { | |
'teams': team | |
} | |
ep_count = requests.get( | |
'https://{0}.pagerduty.com/api/v1/escalation_policies'.format(pd_subdomain), | |
headers=HEADERS, | |
data=json.dumps(params), | |
) | |
return [ep_count.json()['total'], ep_count.json()['limit']] | |
def get_all_eps(pd_team): | |
users = get_all_users() | |
teams = {} | |
if pd_team: | |
all_teams = get_all_teams() | |
teams[pd_team] = all_teams[pd_team] | |
else: | |
teams = get_all_teams() | |
for team_id, team_name in teams.iteritems(): | |
total_eps = get_ep_count(team_id) | |
print("Number of escalation policies for team {0}: {1}".format(team_name, total_eps[0])) | |
for offset in xrange(0, total_eps[0]): | |
if offset % total_eps[1] == 0: # For each x, request the next x | |
ep_data = get_eps(team_id, offset, total_eps[1]) | |
for ep in ep_data: #For each escalation policy | |
i = 0 #So we can keep track of what level of an escalation policy we are on | |
for ep_level in ep['escalation_rules']: #For each escalation level in the escalation policy | |
i += 1 #Up it by one for each level we loop through | |
for ep_target in ep_level['targets']: #For each target in the escalation level | |
if ep_target['type'] == "schedule": #We need to fetch the users on a schedule | |
schedule = get_schedule_users(ep_target['id']) | |
try: | |
for user in schedule: #For each user in the schedule | |
users[user['email']].setdefault('data', {}) #Make sure we have a data dict under the user | |
users[user['email']]['data'].setdefault(team_name, {}) # | |
users[user['email']]['data'][team_name].setdefault(ep['name'], {}) #The key is the schedule name | |
users[user['email']]['data'][team_name][ep['name']].setdefault(i, []) | |
users[user['email']]['data'][team_name][ep['name']][i].append(ep_target['name']) #The key is the level on the escalation policy, value is the schedule name | |
except: | |
continue | |
elif ep_target['type'] == "user": #A user is the target on the escalation policy | |
users[ep_target['email']].setdefault('data', {}) #Make sure we have a data dict under the user | |
users[ep_target['email']]['data'].setdefault(team_name, {}) #The key is the schedule name | |
users[ep_target['email']]['data'][team_name].setdefault(ep['name'], {}) #The key is the schedule name | |
users[ep_target['email']]['data'][team_name][ep['name']].setdefault(i, []) | |
users[ep_target['email']]['data'][team_name][ep['name']][i].append('') | |
return users | |
def get_eps(team, offset, limit): | |
params = { | |
'offset': offset, | |
'limit': limit, | |
'teams': team | |
} | |
print("Offset: {0}".format(offset)) | |
ep = requests.get( | |
'https://{0}.pagerduty.com/api/v1/escalation_policies'.format(pd_subdomain), | |
headers=HEADERS, | |
data=json.dumps(params), | |
) | |
return ep.json()['escalation_policies'] | |
def get_schedule_users(scheduleID): | |
global start_date, end_date | |
if not start_date: | |
start_date = date.today().isoformat() | |
if not end_date: | |
end_date = (date.today() + timedelta(180)).isoformat() ## API has a max of 180 day span in one call | |
params = { | |
'since': start_date, | |
'until': end_date | |
} | |
schedule = requests.get( | |
'https://{0}.pagerduty.com/api/v1/schedules/{1}/users'.format(pd_subdomain, scheduleID), | |
headers=HEADERS, | |
data=json.dumps(params), | |
) | |
try: | |
return schedule.json()['users'] | |
except: | |
print("Schedule ID {0} returned an error".format(scheduleID)) | |
return None | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv | |
get_users_and_escalation_policies() | |
if __name__=='__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment