Created
August 7, 2015 22:19
-
-
Save JoshRosen/c897f433ebf617a6bba5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Spaghetti code to delete comments from AmplabJenkins. | |
""" | |
import os | |
import sys | |
import requests | |
from link_header import parse as parse_link_header | |
import logging | |
import json | |
BASE_URL = 'https://api.github.com/' | |
BASE_AUTH_URL = 'https://github.com/login/oauth/' | |
ISSUES_BASE = BASE_URL + "repos/apache/spark/issues" | |
PULLS_BASE = BASE_URL + "repos/apache/spark/pulls" | |
def github_request(resource, oauth_token=None, etag=None): | |
return raw_github_request(BASE_URL + resource, oauth_token, etag) | |
def raw_github_request(url, oauth_token=None, etag=None, method="get"): | |
headers = {} | |
if etag is not None: | |
headers['If-None-Match'] = etag | |
if oauth_token is not None: | |
headers["Authorization"] = "token %s" % oauth_token | |
logging.info("Requesting %s from GitHub with headers %s" % (url, headers)) | |
if method == "get": | |
response = requests.get(url, headers=headers) | |
elif method == "delete": | |
response = requests.delete(url, headers=headers) | |
if response.status_code == 204: | |
return None | |
else: | |
raise Exception("Unknown method!") | |
if response.status_code == 304: | |
return None | |
elif response.status_code == 200: | |
return response | |
else: | |
raise Exception("Unexpected status code: %i\n%s" % (response.status_code, response.content)) | |
def paginated_github_request(url, oauth_token=None, etag=None): | |
""" | |
Retrieve and decode JSON from GitHub endpoints that use pagination. | |
Automatically follows 'next' links. | |
:return: (Decoded JSON, ETag) pair | |
""" | |
# Grab the first page | |
initial_response = raw_github_request(url, oauth_token, etag) | |
if initial_response is None: | |
return None | |
result = json.loads(initial_response.content) | |
etag = initial_response.headers["ETag"] | |
# Continue following 'next' links, appending the decoded responses to 'result' | |
def get_next_url(resp): | |
link_header = parse_link_header(resp.headers.get('Link', '')) | |
for link in link_header.links: | |
if link.rel == 'next': | |
return link.href | |
next_url = get_next_url(initial_response) | |
while next_url: | |
response = raw_github_request(next_url, oauth_token) | |
result.extend(json.loads(response.content)) | |
next_url = get_next_url(response) | |
return result, etag | |
def _main(): | |
token = os.environ["AMPLAB_JENKINS_OAUTH_COMMENT_TOKEN"] | |
pr_number = int(sys.argv[1]) | |
comments_url = ISSUES_BASE + "/" + str(pr_number) + "/comments" | |
for comment in paginated_github_request(comments_url, token)[0]: | |
if comment['user']['login'] == "AmplabJenkins": | |
print raw_github_request(comment['url'], token, method="delete") | |
def _test(): | |
import doctest | |
(failure_count, test_count) = doctest.testmod() | |
if failure_count: | |
exit(-1) | |
if __name__ == "__main__": | |
_test() | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment