Created
May 31, 2012 07:59
-
-
Save ynkdir/2841754 to your computer and use it in GitHub Desktop.
github.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import base64 | |
import email.generator | |
import email.mime.application | |
import email.mime.multipart | |
import email.mime.text | |
import io | |
import json | |
import re | |
import urllib.error | |
import urllib.parse | |
import urllib.request | |
class Github: | |
def __init__(self, username=None, password=None): | |
self.username = username | |
self.password = password | |
def http_request(self, url): | |
req = urllib.request.Request(url) | |
if self.username is not None and self.password is not None: | |
seed = self.username + ":" + self.password | |
token = base64.b64encode(seed.encode("utf-8")).decode("utf-8") | |
req.add_header("Authorization", "Basic " + token) | |
return req | |
def http_get(self, url): | |
req = self.http_request(url) | |
req.get_method = lambda: "GET" | |
return urllib.request.urlopen(req) | |
def http_post(self, url, data): | |
req = self.http_request(url) | |
req.get_method = lambda: "POST" | |
return urllib.request.urlopen(req, data) | |
def http_put(self, url): | |
req = self.http_request(url) | |
req.get_method = lambda: "PUT" | |
return urllib.request.urlopen(req, b"") | |
def http_delete(self, url): | |
req = self.http_request(url) | |
req.get_method = lambda: "DELETE" | |
return urllib.request.urlopen(req) | |
def http_post_multipart_form_data(self, url, params): | |
""" | |
@param url str | |
@param params list [(key1, value1), (key2, value2), ...] | |
""" | |
msg = email.mime.multipart.MIMEMultipart("form-data") | |
for name, value in params: | |
if isinstance(value, bytes): | |
m = email.mime.application.MIMEApplication(value) | |
else: | |
m = email.mime.text.MIMEText(value) | |
m.add_header("Content-Disposition", "form-data", name=name) | |
msg.attach(m) | |
# msg's boundary is set by generator | |
f = io.BytesIO() | |
g = email.generator.BytesGenerator(f) | |
g.flatten(msg, linesep="\r\n") | |
content = f.getvalue() | |
# remove header | |
content = content[content.find(b"\r\n\r\n") + 4:] | |
req = urllib.request.Request(url, content) | |
req.add_header("Content-Type", msg["Content-Type"]) | |
return urllib.request.urlopen(req) | |
def parse_linkheader(self, s): | |
res = [] | |
for link in s.split(','): | |
e = {} | |
m = re.search(r"<([^>]*)>", link) | |
e["url"] = m.group(1) | |
for m in re.finditer(r'(\w+)="([^"]*)"', link): | |
e[m.group(1).lower()] = m.group(2) | |
res.append(e) | |
return res | |
def fetch_allpages(self, url): | |
items = [] | |
nexturl = url | |
while nexturl is not None: | |
with self.http_get(nexturl) as res: | |
data = json.loads(res.read().decode("utf-8")) | |
items.extend(data) | |
nexturl = None | |
if "link" in res.info(): | |
for link in self.parse_linkheader(res.info()["link"]): | |
if link["rel"] == "next": | |
nexturl = link["url"] | |
break | |
return items | |
def fetch_one(self, url): | |
with self.http_get(url) as res: | |
data = json.loads(res.read().decode("utf-8")) | |
return data | |
def issue_list(self, user, repo): | |
url = "https://api.github.com/repos/{user}/{repo}/issues" \ | |
.format(user=user, repo=repo) \ | |
+ "?per_page=9999" | |
items = [] | |
items.extend(self.fetch_allpages(url + "&state=open")) | |
items.extend(self.fetch_allpages(url + "&state=closed")) | |
items.sort(key=lambda e: e["number"]) | |
return items | |
def issue_get(self, user, repo, number): | |
url = "https://api.github.com/repos/{user}/{repo}/issues/{number}" \ | |
.format(user=user, repo=repo, number=number) | |
return self.fetch_one(url) | |
def issue_comment_list(self, user, repo, number): | |
url = "https://api.github.com/repos/{user}/{repo}/issues/{number}/comments" \ | |
.format(user=user, repo=repo, number=number) | |
return self.fetch_allpages(url) | |
def issue_comment_get(self, user, repo, id_): | |
url = "https://api.github.com/repos/{user}/{repo}/issues/comments/{id}" \ | |
.format(user=user, repo=repo, id=id_) | |
return self.fetch_one(url) | |
def org_member_list(self, org): | |
url = "https://api.github.com/orgs/{org}/members" \ | |
.format(org=org) | |
return self.fetch_allpages(url) | |
def org_member_get(self, org, user): | |
url = "https://api.github.com/orgs/{org}/members/{user}" \ | |
.format(org=org, user=user) | |
return self.fetch_one(url) | |
def org_public_member_list(self, org): | |
url = "https://api.github.com/orgs/{org}/public_members" \ | |
.format(org=org) | |
return self.fetch_allpages(url) | |
def org_public_member_get(self, org, user): | |
url = "https://api.github.com/orgs/{org}/public_members/{user}" \ | |
.format(org=org, user=user) | |
return self.fetch_one(url) | |
def org_team_list(self, org): | |
url = "https://api.github.com/orgs/{org}/teams" \ | |
.format(org=org) | |
return self.fetch_allpages(url) | |
def team_get(self, id_): | |
url = "https://api.github.com/teams/{id}" \ | |
.format(id=id_) | |
return self.fetch_one(url) | |
def team_member_list(self, id_): | |
url = "https://api.github.com/teams/{id}/members" \ | |
.format(id=id_) | |
return self.fetch_allpages(url) | |
def team_member_get(self, id_, user): | |
url = "https://api.github.com/teams/{id}/members/{user}" \ | |
.format(id=id_, user=user) | |
return self.fetch_one(url) | |
def team_member_add(self, id_, user): | |
url = "https://api.github.com/teams/{id}/members/{user}" \ | |
.format(id=id_, user=user) | |
return self.http_put(url) | |
def repo_download_list(self, user, repo): | |
url = "https://api.github.com/repos/{user}/{repo}/downloads" \ | |
.format(user=user, repo=repo) | |
return self.fetch_one(url) | |
def repo_download_get(self, user, repo, id_): | |
url = "https://api.github.com/repos/{user}/{repo}/downloads/{id}" \ | |
.format(user=user, repo=repo, id=id_) | |
return self.fetch_one(url) | |
def repo_download_create(self, user, repo, name, size, description=None, content_type=None): | |
""" | |
@param name str | |
@param size int Size of file in bytes. | |
@param description str (optional) | |
@param content_type str (optional) | |
""" | |
url = "https://api.github.com/repos/{user}/{repo}/downloads" \ | |
.format(user=user, repo=repo) | |
params = {} | |
params["name"] = name | |
params["size"] = int(size) | |
if description is not None: | |
params["description"] = description | |
if content_type is not None: | |
params["content_type"] = content_type | |
reqdata = json.dumps(params).encode("utf-8") | |
with self.http_post(url, reqdata) as res: | |
resdata = json.loads(res.read().decode("utf-8")) | |
return resdata | |
def repo_download_delete(self, user, repo, id_): | |
url = "https://api.github.com/repos/{user}/{repo}/downloads/{id}" \ | |
.format(user=user, repo=repo, id=id_) | |
return self.http_delete(url) | |
def repo_download_upload(self, user, repo, name, data, description=None, content_type=None): | |
""" | |
@param name str | |
@param data bytes | |
@param description str (optional) | |
@param content_type str (optional) | |
""" | |
download = self.repo_download_create(user, repo, name, len(data), | |
description, content_type) | |
return self.http_post_multipart_form_data(download["s3_url"], [ | |
("key", download["path"]), | |
("acl", download["acl"]), | |
("success_action_status", "201"), | |
("Filename", download["name"]), | |
("AWSAccessKeyId", download["accesskeyid"]), | |
("Policy", download["policy"]), | |
("Signature", download["signature"]), | |
("Content-Type", download["mime_type"]), | |
("file", data)]) | |
def repo_watcher_list(self, user, repo): | |
url = "https://api.github.com/repos/{user}/{repo}/watchers" \ | |
.format(user=user, repo=repo) | |
return self.fetch_allpages(url) | |
def user_watching_list(self, user): | |
url = "https://api.github.com/users/{user}/watched" \ | |
.format(user=user) | |
return self.fetch_allpages(url) | |
# Check if authenticated user is watching repo | |
def user_watching_repo(self, user, repo): | |
url = "https://api.github.com/user/watched/{user}/{repo}" \ | |
.format(user=user, repo=repo) | |
try: | |
res = self.http_get(url) | |
return True | |
except urllib.error.HTTPError as res: | |
if res.code == 404: # Not Found | |
return False | |
raise | |
# Let authenticated user watch repo. | |
def user_watch(self, user, repo): | |
url = "https://api.github.com/user/watched/{user}/{repo}" \ | |
.format(user=user, repo=repo) | |
return self.http_put(url) | |
# Let authenticated user stop watch user. | |
def user_stopwatch(self, user, repo): | |
url = "https://api.github.com/user/watched/{user}/{repo}" \ | |
.format(user=user, repo=repo) | |
return self.http_delete(url) | |
def user_follower_list(self, user): | |
url = "https://api.github.com/users/{user}/followers" \ | |
.format(user=user) | |
return self.fetch_allpages(url) | |
def user_following_list(self, user): | |
url = "https://api.github.com/users/{user}/following" \ | |
.format(user=user) | |
return self.fetch_allpages(url) | |
# Check if authenticated user is following user. | |
def user_following_user(self, user): | |
url = "https://api.github.com/user/following/{user}" \ | |
.format(user=user) | |
try: | |
res = self.http_get(url) | |
return True | |
except urllib.error.HTTPError as res: | |
if res.code == 404: # Not Found | |
return False | |
raise | |
# Let authenticated user follow user. | |
def user_follow(self, user): | |
url = "https://api.github.com/user/following/{user}" \ | |
.format(user=user) | |
return self.http_put(url) | |
# Let authenticated user unfollow user. | |
def user_unfollow(self, user): | |
url = "https://api.github.com/user/following/{user}" \ | |
.format(user=user) | |
return self.http_delete(url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
cool