|
#!/usr/bin/env python3 |
|
from typing import NamedTuple |
|
from dataclasses import dataclass |
|
import dataclasses as dcs |
|
|
|
from enum import Enum, IntEnum |
|
from dotenv import load_dotenv |
|
from github import Github, Repository |
|
import os |
|
import json |
|
import logging |
|
|
|
logging.basicConfig() |
|
|
|
logging.info('test') |
|
|
|
load_dotenv() |
|
|
|
# https://stackoverflow.com/a/51286749 |
|
class EnhancedJSONEncoder(json.JSONEncoder): |
|
def default(self, o): |
|
if dcs.is_dataclass(o): |
|
return dcs.asdict(o) |
|
return super().default(o) |
|
|
|
|
|
class Risk(IntEnum): |
|
HIGH = 3 |
|
MED = 2 |
|
QA = 1 |
|
GAS = 0 |
|
def __lt__(self, other): |
|
if self.__class__ is other.__class__: |
|
return self.value < other.value |
|
return NotImplemented |
|
|
|
class Label(Enum): |
|
PRIMARY_ISSUE = 'primary issue' |
|
SELECTED = 'selected for report' |
|
SPONSOR_CONFIRMED = 'sponsor confirmed' |
|
PARTIAL_75 = 'partial-75' |
|
PARTIAL_50 = 'partial-50' |
|
PARTIAL_25 = 'partial-25' |
|
UNSATISFACTORY = 'unsatisfactory' |
|
|
|
@dataclass |
|
class Issue: |
|
is_selected: bool |
|
gh_issue_number: int |
|
bug_id: int |
|
risk: Risk |
|
warden : str |
|
title: str |
|
is_hm: bool # is high or med |
|
share: float |
|
is_confirmed: bool |
|
credit_percent: float |
|
payout: float |
|
|
|
|
|
|
|
class HMBug(NamedTuple): |
|
primary_issue_number: int |
|
dupes_nums: list[int] |
|
dupes_wardens: list[str] |
|
risk: Risk |
|
title: str |
|
|
|
|
|
|
|
token = os.environ.get("GH_ACCESS_TOKEN") |
|
FINDINGS = 'findings' |
|
|
|
|
|
g = Github(token) |
|
|
|
RISK_DICT = { |
|
'3 (High Risk)': Risk.HIGH, |
|
'2 (Med Risk)':Risk.MED, |
|
'QA (Quality Assurance)': Risk.QA, |
|
'G (Gas Optimization)': Risk.GAS |
|
} |
|
|
|
RISK_SHARE_DICT = { |
|
Risk.MED: 3.0, |
|
Risk.HIGH: 10.0 |
|
} |
|
|
|
DUPLICATE = 'duplicate-' |
|
|
|
def get_risk(labels: list[str]) -> Risk: |
|
for l in labels: |
|
if l in RISK_DICT: |
|
return RISK_DICT[l] |
|
|
|
def get_primary_issue_id(labels: list[str]): |
|
for l in labels: |
|
if DUPLICATE in l: |
|
str_num = l.replace(DUPLICATE, '') |
|
return int(str_num) |
|
|
|
|
|
def search_repo(name: str) -> Repository: |
|
for repo in g.get_user().get_repos(): |
|
if name in repo.name.lower() and FINDINGS in repo.name.lower(): |
|
return repo |
|
|
|
|
|
def get_issues_mapping_efficiently(repo: Repository): |
|
id_to_handler: dict[int, str] = {} |
|
contents = repo.get_contents('data') |
|
paths: list[str] = [c.path for c in contents] |
|
clean_paths = [p.replace('data/','').replace('.json', '').replace('-Q', '') for p in paths] |
|
for p in clean_paths: |
|
if '.md' in p: |
|
continue |
|
parts = p.split('-') |
|
iid_str = parts[-1] |
|
handler = '-'.join(parts[:-1]) |
|
try: |
|
iid = int(iid_str) |
|
id_to_handler[iid] = handler |
|
except ValueError: |
|
continue |
|
return id_to_handler |
|
|
|
def get_issues(repo: Repository) -> list[Issue]: |
|
gh_issues = repo.get_issues(state='all') |
|
iid2handler = get_issues_mapping_efficiently(repo) |
|
issues: list[Issue] = [] |
|
for gh_issue in gh_issues: |
|
try: |
|
labels = [l.name for l in gh_issue.labels] |
|
if Label.UNSATISFACTORY in labels: |
|
continue |
|
issue_number = gh_issue.number |
|
warden = iid2handler[issue_number] |
|
bug_id = issue_number |
|
|
|
|
|
|
|
risk = get_risk(labels) |
|
if risk is None: |
|
logging.warn(f"skipping {issue_number} since it doesn't have a risk label") |
|
continue |
|
is_hm = risk >= Risk.MED |
|
is_selected = is_hm and gh_issue.state == 'open' |
|
is_confirmed = Label.SPONSOR_CONFIRMED.value in labels |
|
|
|
if is_hm: |
|
if is_selected: |
|
if Label.SELECTED.value not in labels: |
|
logging.warn(f"Warning: #{issue_number} is open but not labeled as selected (considering it selected)") |
|
else: |
|
bug_id = get_primary_issue_id(labels) |
|
title = gh_issue.title |
|
credit =1.0 |
|
if Label.PARTIAL_25.value in labels: |
|
credit = 0.25 |
|
elif Label.PARTIAL_50.value in labels: |
|
credit = 0.5 |
|
elif Label.PARTIAL_75.value in labels: |
|
credit = 0.75 |
|
|
|
|
|
issues.append( |
|
Issue(is_selected, issue_number, bug_id, risk, warden, title, is_hm, 0, is_confirmed, credit, 0) |
|
) |
|
except Exception as e: |
|
logging.exception(f"failed for #{gh_issue.number}") |
|
return issues |
|
|
|
def issues_to_bugs(issues: list[Issue]) -> list[HMBug]: |
|
bugs_dict = {i.gh_issue_number:HMBug(i.gh_issue_number, [], [],i.risk,i.title ) for i in issues if i.is_selected} |
|
dupes = [i for i in issues if not i.is_selected] |
|
for d in dupes: |
|
bug = bugs_dict[d.bug_id] |
|
bug.dupes_nums.append(d.gh_issue_number) |
|
bug.dupes_wardens.append(d.warden) |
|
return list(bugs_dict.values()) |
|
|
|
def main(keyword: str, hm_pot: float): |
|
repo = search_repo(keyword) |
|
issues = get_issues(repo) |
|
hm_issues = [i for i in issues if i.is_hm] |
|
calculate_shares(hm_issues) |
|
hm_issues = [i for i in hm_issues if i.share > 0] |
|
total_hm_shares = sum(i.share for i in hm_issues) |
|
calculate_payout(hm_issues, hm_pot, total_hm_shares) |
|
share_per_warden = get_shares_by_warden(hm_issues) |
|
with open(f"{keyword}.out.json", "w") as file: |
|
json.dump({'spw':share_per_warden,'total': total_hm_shares, 'issues': hm_issues}, file, cls=EnhancedJSONEncoder) |
|
print("done") |
|
|
|
|
|
def count_dupes(issues: list[Issue]): |
|
number2count = {} |
|
for i in issues: |
|
if i.bug_id not in number2count: |
|
number2count[i.bug_id] = 0 |
|
number2count[i.bug_id] += 1 |
|
# number2count[i.bug_id] += i.credit_percent |
|
return number2count |
|
|
|
def get_shares_by_warden(issues: list[Issue]): |
|
warden2share = {} |
|
warden2payout = {} |
|
for i in issues: |
|
if i.warden not in warden2share: |
|
warden2share[i.warden] = 0 |
|
warden2share[i.warden] += i.share |
|
if i.warden not in warden2payout: |
|
warden2payout[i.warden] = 0 |
|
warden2payout[i.warden] += i.payout |
|
return {"share":warden2share, "payout":warden2payout} |
|
|
|
|
|
def calculate_shares(issues: list[Issue], only_confirmed = True): |
|
confirmed = [i.bug_id for i in issues if i.is_selected] |
|
dupes_count = count_dupes(issues) |
|
for i in issues: |
|
if only_confirmed and i.bug_id not in confirmed: |
|
continue |
|
dupes = dupes_count[i.bug_id] |
|
share = (RISK_SHARE_DICT[i.risk] * 0.9 ** (dupes-1) / dupes) * i.credit_percent |
|
if i.is_selected: |
|
share *= 1.3 |
|
i.share = share |
|
|
|
def calculate_payout(issues: list[Issue], hm_pot: float,total_hm_shares: float): |
|
for i in issues: |
|
if not i.is_hm: |
|
continue |
|
i.payout = i.share * hm_pot / total_hm_shares |
|
|
|
|
|
contest_name = os.environ.get("CONTEST_NAME") |
|
hm_pot = int(os.environ.get("HMS_POT")) |
|
main(contest_name, hm_pot) |