Created
January 18, 2022 23:28
-
-
Save snydergd/ceb3a229e1b8d81dc1ecf50e6d5640dd to your computer and use it in GitHub Desktop.
StackOverflow Teams Reward Distribution Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import argparse | |
import requests | |
import os | |
import json | |
from datetime import datetime, timedelta | |
import random | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--token") | |
parser.add_argument("--key") | |
parser.add_argument("--run-date", type=lambda s: datetime.strptime(s, '%Y-%m-%d')) | |
""" requirements.txt | |
certifi==2021.10.8 | |
requests==2.26.0 | |
# Indirect (pinned versions) | |
charset-normalizer==2.0.9 | |
idna==3.3 | |
urllib3==1.26.8 | |
""" | |
""" Example Config file (SO_awards.json): | |
{ | |
"exclusions": [ | |
{ "description": "Larry", "id": 13 }, | |
{ "description": "Curly", "id": 325 }, | |
{ "description": "Moe", "id": 6 } | |
], | |
"nominations": { | |
"202110": [ | |
{ "nominee": "Neo", "nominator": "Trinity" } | |
] | |
} | |
} | |
""" | |
CONFIG_URL = "http://some_location/data/SO_awards.json" | |
API_URL_BASE = "https://api.stackexchange.com/2.3" | |
API_PARAM_TEAM = "stackoverflow.com/c/fedins" | |
CACHE_FOLDER = ".cache" | |
def get_cache_file(name): | |
return os.path.join(CACHE_FOLDER, name) | |
def get_cached_updatetime(name): | |
return datetime.fromtimestamp(os.path.getmtime(get_cache_file(name))) | |
def get_with_cache(name, fn, kwargs={}, valid_for=timedelta(days=1), compare_to=datetime.now()): | |
if not os.path.isdir(CACHE_FOLDER): | |
os.mkdir(CACHE_FOLDER) | |
filename = get_cache_file(name) | |
if os.path.exists(filename) and get_cached_updatetime(name) + valid_for > compare_to: | |
print(f"using cached {name}") | |
with open(filename, "r") as f: | |
result = json.load(f) | |
else: | |
print(f"recreating {name}") | |
result = fn(**kwargs) | |
try: | |
with open(filename, "w") as f: | |
json.dump(result, f) | |
except TypeError: | |
os.remove(filename) | |
return result | |
class Caller: | |
run_date: datetime | |
def __init__(self, config): | |
self.token = config.token | |
self.key = config.key | |
if config.run_date: | |
self.run_date = config.run_date | |
else: | |
self.run_date = datetime.today() | |
def get_raw_data(self, endpoint, page = 1, pagesize = 30, params={}): # defaults from SO docs | |
params = { | |
"key": self.key, | |
"team": API_PARAM_TEAM, | |
"site": "stackoverflow", | |
"page": page, | |
"pagesize": pagesize, | |
**params | |
} | |
headers = { | |
"X-API-Access-Token": self.token | |
} | |
request = requests.Request(method="GET", params=params, headers=headers, url=f"{API_URL_BASE}{endpoint}") | |
preparedRequest = request.prepare() | |
print(preparedRequest.url) | |
session = requests.Session() | |
response = session.send(preparedRequest) | |
data = response.json() | |
return data | |
def get_all_data(self, endpoint, pagesize = 100, params={}): # maximum page size is defaulted here | |
page = 1 | |
try: | |
data = self.get_raw_data(endpoint, page, pagesize, params) | |
for item in data["items"]: | |
yield item | |
while data["has_more"] == True: | |
page += 1 | |
data = self.get_raw_data(endpoint, page, pagesize, params) | |
for item in data["items"]: | |
yield item | |
except Exception as e: | |
print(e) | |
print(json.dumps(data, indent=2)) | |
# For more information, see the APIs: https://api.stackexchange.com/docs | |
# Also have a look at the teams-specific ones: https://api.stackexchange.com/docs/teams | |
def get_previous_quarter(self): | |
d = self.run_date | |
year = d.year | |
month = (d.month-1) - (d.month-1)%3 - 3 | |
endyear = year | |
endmonth = month+3 | |
if month < 0: | |
month += 12 | |
year -= 1 | |
return (datetime(year, month+1, 1), datetime(endyear, endmonth+1, 1)) | |
def get_previous_month(self): | |
month = self.run_date.month-1 | |
year = self.run_date.year | |
if month < 1: | |
month = 12 | |
year -= 1 | |
return (datetime(year, month, 1), datetime(self.run_date.year, self.run_date.month, 1)) | |
def get_retrieval_timerange(self): | |
dates = self.get_previous_quarter() + self.get_previous_month() | |
return (min(dates), max(dates)) | |
def get_user_list(self): | |
return list(self.get_all_data("/users", params={ | |
"filter": "!0ZJMp6Z5IQ3kGdOOMEatVA*mw", | |
"order": "desc", | |
"sort": "reputation" | |
})) | |
def get_rep_change_for_time_range_and_ids(self, ids, start, end): | |
return list(self.get_all_data(f"/users/{ids}/reputation-history", params={ | |
"fromdate": int(start.timestamp()), | |
"todate": int(end.timestamp()) | |
})) | |
def run(self): | |
### | |
print(f"== Report information {str(self.run_date)}") | |
print(f"Previous month range: {[str(x) for x in self.get_previous_month()]}") | |
print(f"Previous quarter range: {[str(x) for x in self.get_previous_quarter()]}") | |
print(f"Range to retrieve: {[str(x) for x in self.get_retrieval_timerange()]}") | |
### | |
print("") | |
print(f"== Collecting data") | |
a_week = timedelta(days=7) | |
period = self.get_retrieval_timerange() | |
user_cache_time = get_cached_updatetime("users") | |
users = get_with_cache("users", self.get_user_list, valid_for=a_week) | |
id_groups = [";".join(items) for items in [ | |
map(lambda x: str(x['user_id']), users[a:a+100]) for a in range(0, len(users), 100) | |
]] | |
rep_data = [] | |
for (i,id_group) in enumerate(id_groups): | |
cache_key = f"reputation_{i}" | |
rep = get_with_cache( | |
cache_key, | |
self.get_rep_change_for_time_range_and_ids, | |
valid_for=timedelta(0), | |
compare_to=min(user_cache_time, get_cached_updatetime(cache_key) + a_week if os.path.exists(get_cache_file(cache_key)) else user_cache_time), | |
kwargs={ | |
"ids": id_group, | |
"start": period[0], | |
"end": period[1] | |
} | |
) | |
rep_data += rep | |
range_stats = { | |
"quarterly": { | |
"range": self.get_previous_quarter(), | |
"data": {} | |
}, | |
"monthly": { | |
"range": self.get_previous_month(), | |
"data": {} | |
} | |
} | |
for rep in rep_data: | |
d = datetime.fromtimestamp(rep['creation_date']) | |
for interval in range_stats.values(): | |
if d > interval["range"][0] and d < interval["range"][1]: | |
user_id = rep['user_id'] | |
if not user_id in interval["data"]: | |
interval["data"][user_id] = {'rep': 0, 'dates': [], 'id': user_id} | |
interval["data"][user_id]['rep'] += rep['reputation_change'] | |
interval["data"][user_id]['dates'].append((rep['reputation_change'], rep['reputation_history_type'], str(d))) | |
id_mapping = {item['user_id']: item for item in users} | |
ratings = { | |
category: [ | |
[id_mapping[id]['display_name'], detail] for (id,detail) in sorted(range_stats[category]["data"].items(), reverse=True, key=lambda x: x[1]['rep']) | |
] | |
for category in range_stats.keys() | |
} | |
with open(get_cache_file("aggregate"), "w") as f: | |
json.dump(ratings, f, indent=' ') | |
### | |
print("") | |
print("== Finding winners") | |
nomination_month_key = self.run_date.strftime("%Y%m") | |
config_data = requests.get(CONFIG_URL).json() | |
nomination_data = config_data["nominations"] | |
excluded_from_awards = list(map(lambda exclusion: exclusion["id"], config_data["exclusions"])) | |
print(f"These users are being excluded from awards {', '.join([x['description'] + ' (' + str(x['id']) + ')' for x in config_data['exclusions']])}") | |
### | |
print("") | |
print("== These are the winners!") | |
top_10_month = [f"\n {rating[0]} - {rating[1]['rep']}" for rating in [x for x in ratings["monthly"] if x[1]['id'] not in excluded_from_awards][:10]] | |
top_5_month = top_10_month[:5] | |
top_2_quarter = [f"\n {rating[0]} - {rating[1]['rep']}" for rating in [x for x in ratings["quarterly"] if x[1]['id'] not in excluded_from_awards][:2]] | |
print(f"Quarterly Top 2 (winner): {''.join(top_2_quarter)}") | |
print(f"Monthly Top 5 (winner): {''.join(top_5_month)}") | |
print(f"Monthly Top 10 for drawing: {''.join(top_10_month)}") | |
if nomination_month_key in nomination_data: | |
nominations = [f"\n {nomination['nominee']} (by {nomination['nominator']})" for nomination in nomination_data[nomination_month_key]] | |
print(f"Monthly drawing entries from nominations: {''.join(nominations)}") | |
else: | |
nominations = [] | |
print("There are no additional drawing entries from nominations this month") | |
drawing_winners = random.sample(top_10_month + nominations, 4) | |
print(f"Monthly drawing winners: {''.join(drawing_winners)}") | |
if __name__ == "__main__": | |
caller = Caller(parser.parse_args()) | |
caller.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment