Skip to content

Instantly share code, notes, and snippets.

@Jc2k
Last active April 1, 2016 13:06
Show Gist options
  • Save Jc2k/dd489cf0047b5a05d0be to your computer and use it in GitHub Desktop.
Save Jc2k/dd489cf0047b5a05d0be to your computer and use it in GitHub Desktop.
Get totals for how many hours an engineer spends on call for a given PagerDuty schedule
from __future__ import print_function, division
from decimal import Decimal
import argparse
import datetime
import getpass
import sys
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
import requests
TWO_PLACES = Decimal('0.01')
def get_oncall_schedule(domain, api_key, schedule_id, since, until):
url = 'https://{domain}.pagerduty.com/api/v1/schedules/{schedule_id}/entries'.format(
domain=domain,
schedule_id=schedule_id,
)
return requests.get(
url,
params={
"since": since,
"until": until,
"timezone": "Europe/London",
},
headers={
'Authorization': 'Token token={}'.format(api_key),
},
).json().get('entries', [])
def tally_entries(entries):
period_start = parse(entries[0]['start'])
tally = {}
for entry in entries:
email = entry['user']['email']
start = parse(entry['start'])
end = parse(entry['end'])
print("{email} is on call from {start:%d-%m-%Y %H:%M:%S} to {end:%d-%m-%Y %H:%M:%S}".format(
email=email,
start=start,
end=end,
))
if start.tzinfo != end.tzinfo:
print("{email} was on call during a timezone change from {orig} to {new}".format(
email=email,
orig=start.tzinfo,
new=end.tzinfo,
))
duration = (end - start).total_seconds()
print("{email} is on call for {duration:.1f} hours".format(
email=email,
duration=((duration / 60.0)/ 60.0),
))
if duration % 60 * 60 > 0:
print("{email} was not on call for a multiple of an hour".format(email=email))
tally[email] = tally.get(email, 0) + duration
print("{email} has been on call for {tally:.1f} hours since {start_date:%d-%m-%Y %H:%M:%S}".format(
email=email,
tally=((tally[email] / 60.0) / 60.0),
start_date=period_start,
))
print("")
return tally
def print_totals(rate, tally):
total_money = Decimal(0)
print("Totals")
print("======")
print()
for k in sorted(tally.keys()):
hours = (tally[k] / 60) / 60
days = hours / 24
print("{user} on call for {hours} hours (~{days:.1f} days)".format(
user=k,
days=days,
hours=hours,
))
money = ((Decimal(hours)/Decimal(24)) * Decimal(rate)).quantize(TWO_PLACES)
total_money += money
print("{user} earns {money}".format(
user=k,
money=((Decimal(hours)/Decimal(24)) * Decimal(rate)).quantize(TWO_PLACES),
))
print()
total_hours = (sum(tally.values()) / 60) / 60
total = total_hours / 24
print("For a total of {:.1f} days and {}".format(total, total_money.quantize(TWO_PLACES)))
def month(value):
value = int(value)
if value < 1 or value > 12:
raise argparse.ArgumentTypeError("Month must be between 1 and 12")
return value
def year(value):
value = int(value)
this_year = datetime.datetime.now().year
diff = value - this_year
if diff < -10 or diff > 10:
raise argparse.ArgumentTypeError("Year must be within a decade of this year")
return value
def main(argv=None):
now = datetime.datetime.now()
p = argparse.ArgumentParser(description='Work out how much to pay 24 hour party people')
p.add_argument('domain')
p.add_argument('schedule')
p.add_argument('rate')
p.add_argument('--year', action="store", type=year, default=now.year)
p.add_argument('--month', action="store", type=month, default=now.month)
params = p.parse_args(argv or sys.argv[1:])
start_date = datetime.date(params.year, params.month, 1)
end_date = start_date + relativedelta(months=1)
api_key = getpass.getpass("API Key: ")
if not api_key:
raise SystemExit("API Key is invalid")
print("Hours on call from {start_date} to {end_date}".format(
start_date=start_date,
end_date=end_date,
))
print("===========================================")
print()
entries = get_oncall_schedule(
params.domain,
api_key,
params.schedule,
start_date,
end_date,
)
if not entries:
raise SystemExit("No one was on call for this period")
tally = tally_entries(entries)
print_totals(params.rate, tally)
print("")
print("Balancing")
print("=========")
print("")
duration = end_date - start_date
total_hours_naive = ((duration.total_seconds() / 60) / 60)
from pytz import timezone
el = timezone("Europe/London")
start_date = el.localize(datetime.datetime.combine(start_date, datetime.datetime.min.time()))
end_date = el.localize(datetime.datetime.combine(end_date, datetime.datetime.min.time()))
duration = end_date - start_date
total_hours = ((duration.total_seconds() / 60) / 60)
print("Between {} and {} there were {} hours".format(start_date, end_date, total_hours))
print("({} / 24) * {} = {}".format(
total_hours,
params.rate,
((Decimal(total_hours)/Decimal(24)) * Decimal(params.rate)).quantize(TWO_PLACES),
))
if total_hours != total_hours_naive:
print("During the reporting period there was a clock shift of {} hours".format(total_hours - total_hours_naive))
print()
total_hours = (sum(tally.values()) / 60) / 60
print("{} hours were logged in this period".format(total_hours))
print("({} / 24) * {} = {}".format(
total_hours,
params.rate,
((Decimal(total_hours)/Decimal(24)) * Decimal(params.rate)).quantize(TWO_PLACES),
))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment