Created
May 22, 2024 22:44
-
-
Save FFY00/cf322dc62692f25ea321cffca3fa9e77 to your computer and use it in GitHub Desktop.
Incomplete WIP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dataclasses | |
import inspect | |
import json | |
import logging | |
import os | |
import pprint | |
import string | |
import subprocess | |
import textwrap | |
import types | |
from typing import Iterator, Protocol, Self | |
logging.basicConfig(level=os.environ.get('LOGLEVEL', 'INFO').upper()) | |
class APIItem(types.SimpleNamespace): | |
@classmethod | |
def parse_list(cls, kind: str, data: dict[str, list[object]]) -> Self: | |
if 'edges' in data: | |
return [cls.parse_node(kind, item) for item in data['edges'] if item] | |
if 'nodes' in data: | |
return [cls.parse(kind, item) for item in data['nodes'] if item] | |
raise ValueError('No edges or nodes found.') | |
@classmethod | |
def parse(cls, kind: str, data: dict[str, object]) -> Self: | |
for key, value in data.items(): | |
if isinstance(value, dict): | |
if ('edges' in value) or ('nodes' in value): | |
data[key] = cls.parse_list(key, value) | |
return cls(kind=kind, **data) | |
@classmethod | |
def parse_node(cls, kind: str, data: dict[str, object]) -> Self: | |
return cls.parse(kind, data['node']) | |
class GithubClient: | |
def __init__(self, org: str, repo: str) -> None: | |
self._log = logging.getLogger(self.__class__.__name__) | |
def request(self, query: str, **fields: str) -> dict[str, object]: | |
cmd = [ | |
'gh', 'api', 'graphql', '--paginate', '--slurp', '-f', | |
'-F', f'{org=}', '-F', f'{repo=}', f'query={query}', | |
] | |
for name, value in fields.items(): | |
cmd += ['-F', f'{name}={value}'] | |
response = subprocess.check_output(cmd) | |
data = json.loads(response) | |
caller = inspect.stack()[1][3] | |
info_text = f'\nCALLER: {caller}\nDATA: {json.dumps(data, indent=2)}\n' | |
self._log.debug(textwrap.indent(info_text, prefix='> ').rstrip()) | |
return data | |
def get_issues(self) -> Iterator[APIItem]: | |
query = ''' | |
query($org: String, $repo: String, $endCursor: String) { | |
repository(owner: $org, name: $repo) { | |
issues(first:100, after:$endCursor, states:OPEN) { | |
edges { | |
node { | |
number | |
labels(first:100) { | |
edges { | |
node { | |
name | |
} | |
} | |
} | |
} | |
} | |
pageInfo { | |
hasNextPage | |
endCursor | |
} | |
} | |
} | |
} | |
''' | |
data = self.request(query) | |
for result in data: | |
issues = result['data']['repository']['issues'] | |
yield from APIItem.parse_edges('issues', issues) | |
def get_issue(self, number: int) -> APIItem: | |
query = ''' | |
query($org: String, $repo: String, $number: Int!) { | |
repository(owner: $org, name: $repo) { | |
issue(number: $number) { | |
number | |
labels(first:100) { | |
edges { | |
node { | |
name | |
} | |
} | |
} | |
} | |
} | |
} | |
''' | |
data = self.request(query, number=number) | |
assert len(data) == 1 | |
return APIItem.parse('issue', data[0]['data']['repository']['issue']) | |
def get_project(self, number: int) -> APIItem: | |
query = ''' | |
query($org: String, $repo: String, $number: Int!) { | |
organization(login: $org) { | |
projectV2(number: $number) { | |
id | |
number | |
title | |
fields(first:100) { | |
nodes { | |
... on ProjectV2SingleSelectField { | |
id | |
name | |
options { | |
id | |
name | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
''' | |
data = self.request(query, number=number) | |
assert len(data) == 1 | |
return APIItem.parse('projectV2', data[0]['data']['organization']['projectV2']) | |
def assign_item_to_project_field(self, project: APIItem, item: APIItem, field: APIItem, option: str) -> None: | |
query = ''' | |
mutation($org: String, $repo: String, projectId: ID!, itemId: ID!, fieldId: ID!) { | |
updateProjectV2ItemFieldValue( | |
input: { | |
projectId: $projectId | |
itemId: $itemId | |
fieldId: $fieldId | |
value: { | |
singleSelectOptionId: $singleSelectOptionId | |
} | |
} | |
) { | |
projectV2Item { | |
id | |
} | |
} | |
} | |
''' | |
data = self.request( | |
query, | |
projectId=project.id, | |
itemId=item.id, | |
fieldId=field.id, | |
singleSelectOptionId=option, | |
) | |
client = GithubClient('pyOpenSci', 'software-review') | |
project = client.get_project(7) | |
print(project.fields) | |
for field in project.fields: | |
if field.name == 'Status': | |
status = field | |
column_ids = {option.name: option.id for option in status.options} | |
client.assign_item_to_project_field(project, status, ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment