Skip to content

Instantly share code, notes, and snippets.

@ynkdir
Created May 31, 2012 07:59
Show Gist options
  • Save ynkdir/2841754 to your computer and use it in GitHub Desktop.
Save ynkdir/2841754 to your computer and use it in GitHub Desktop.
github.py
#!/usr/bin/env python3
import base64
import email.generator
import email.mime.application
import email.mime.multipart
import email.mime.text
import io
import json
import re
import urllib.error
import urllib.parse
import urllib.request
class Github:
def __init__(self, username=None, password=None):
self.username = username
self.password = password
def http_request(self, url):
req = urllib.request.Request(url)
if self.username is not None and self.password is not None:
seed = self.username + ":" + self.password
token = base64.b64encode(seed.encode("utf-8")).decode("utf-8")
req.add_header("Authorization", "Basic " + token)
return req
def http_get(self, url):
req = self.http_request(url)
req.get_method = lambda: "GET"
return urllib.request.urlopen(req)
def http_post(self, url, data):
req = self.http_request(url)
req.get_method = lambda: "POST"
return urllib.request.urlopen(req, data)
def http_put(self, url):
req = self.http_request(url)
req.get_method = lambda: "PUT"
return urllib.request.urlopen(req, b"")
def http_delete(self, url):
req = self.http_request(url)
req.get_method = lambda: "DELETE"
return urllib.request.urlopen(req)
def http_post_multipart_form_data(self, url, params):
"""
@param url str
@param params list [(key1, value1), (key2, value2), ...]
"""
msg = email.mime.multipart.MIMEMultipart("form-data")
for name, value in params:
if isinstance(value, bytes):
m = email.mime.application.MIMEApplication(value)
else:
m = email.mime.text.MIMEText(value)
m.add_header("Content-Disposition", "form-data", name=name)
msg.attach(m)
# msg's boundary is set by generator
f = io.BytesIO()
g = email.generator.BytesGenerator(f)
g.flatten(msg, linesep="\r\n")
content = f.getvalue()
# remove header
content = content[content.find(b"\r\n\r\n") + 4:]
req = urllib.request.Request(url, content)
req.add_header("Content-Type", msg["Content-Type"])
return urllib.request.urlopen(req)
def parse_linkheader(self, s):
res = []
for link in s.split(','):
e = {}
m = re.search(r"<([^>]*)>", link)
e["url"] = m.group(1)
for m in re.finditer(r'(\w+)="([^"]*)"', link):
e[m.group(1).lower()] = m.group(2)
res.append(e)
return res
def fetch_allpages(self, url):
items = []
nexturl = url
while nexturl is not None:
with self.http_get(nexturl) as res:
data = json.loads(res.read().decode("utf-8"))
items.extend(data)
nexturl = None
if "link" in res.info():
for link in self.parse_linkheader(res.info()["link"]):
if link["rel"] == "next":
nexturl = link["url"]
break
return items
def fetch_one(self, url):
with self.http_get(url) as res:
data = json.loads(res.read().decode("utf-8"))
return data
def issue_list(self, user, repo):
url = "https://api.github.com/repos/{user}/{repo}/issues" \
.format(user=user, repo=repo) \
+ "?per_page=9999"
items = []
items.extend(self.fetch_allpages(url + "&state=open"))
items.extend(self.fetch_allpages(url + "&state=closed"))
items.sort(key=lambda e: e["number"])
return items
def issue_get(self, user, repo, number):
url = "https://api.github.com/repos/{user}/{repo}/issues/{number}" \
.format(user=user, repo=repo, number=number)
return self.fetch_one(url)
def issue_comment_list(self, user, repo, number):
url = "https://api.github.com/repos/{user}/{repo}/issues/{number}/comments" \
.format(user=user, repo=repo, number=number)
return self.fetch_allpages(url)
def issue_comment_get(self, user, repo, id_):
url = "https://api.github.com/repos/{user}/{repo}/issues/comments/{id}" \
.format(user=user, repo=repo, id=id_)
return self.fetch_one(url)
def org_member_list(self, org):
url = "https://api.github.com/orgs/{org}/members" \
.format(org=org)
return self.fetch_allpages(url)
def org_member_get(self, org, user):
url = "https://api.github.com/orgs/{org}/members/{user}" \
.format(org=org, user=user)
return self.fetch_one(url)
def org_public_member_list(self, org):
url = "https://api.github.com/orgs/{org}/public_members" \
.format(org=org)
return self.fetch_allpages(url)
def org_public_member_get(self, org, user):
url = "https://api.github.com/orgs/{org}/public_members/{user}" \
.format(org=org, user=user)
return self.fetch_one(url)
def org_team_list(self, org):
url = "https://api.github.com/orgs/{org}/teams" \
.format(org=org)
return self.fetch_allpages(url)
def team_get(self, id_):
url = "https://api.github.com/teams/{id}" \
.format(id=id_)
return self.fetch_one(url)
def team_member_list(self, id_):
url = "https://api.github.com/teams/{id}/members" \
.format(id=id_)
return self.fetch_allpages(url)
def team_member_get(self, id_, user):
url = "https://api.github.com/teams/{id}/members/{user}" \
.format(id=id_, user=user)
return self.fetch_one(url)
def team_member_add(self, id_, user):
url = "https://api.github.com/teams/{id}/members/{user}" \
.format(id=id_, user=user)
return self.http_put(url)
def repo_download_list(self, user, repo):
url = "https://api.github.com/repos/{user}/{repo}/downloads" \
.format(user=user, repo=repo)
return self.fetch_one(url)
def repo_download_get(self, user, repo, id_):
url = "https://api.github.com/repos/{user}/{repo}/downloads/{id}" \
.format(user=user, repo=repo, id=id_)
return self.fetch_one(url)
def repo_download_create(self, user, repo, name, size, description=None, content_type=None):
"""
@param name str
@param size int Size of file in bytes.
@param description str (optional)
@param content_type str (optional)
"""
url = "https://api.github.com/repos/{user}/{repo}/downloads" \
.format(user=user, repo=repo)
params = {}
params["name"] = name
params["size"] = int(size)
if description is not None:
params["description"] = description
if content_type is not None:
params["content_type"] = content_type
reqdata = json.dumps(params).encode("utf-8")
with self.http_post(url, reqdata) as res:
resdata = json.loads(res.read().decode("utf-8"))
return resdata
def repo_download_delete(self, user, repo, id_):
url = "https://api.github.com/repos/{user}/{repo}/downloads/{id}" \
.format(user=user, repo=repo, id=id_)
return self.http_delete(url)
def repo_download_upload(self, user, repo, name, data, description=None, content_type=None):
"""
@param name str
@param data bytes
@param description str (optional)
@param content_type str (optional)
"""
download = self.repo_download_create(user, repo, name, len(data),
description, content_type)
return self.http_post_multipart_form_data(download["s3_url"], [
("key", download["path"]),
("acl", download["acl"]),
("success_action_status", "201"),
("Filename", download["name"]),
("AWSAccessKeyId", download["accesskeyid"]),
("Policy", download["policy"]),
("Signature", download["signature"]),
("Content-Type", download["mime_type"]),
("file", data)])
def repo_watcher_list(self, user, repo):
url = "https://api.github.com/repos/{user}/{repo}/watchers" \
.format(user=user, repo=repo)
return self.fetch_allpages(url)
def user_watching_list(self, user):
url = "https://api.github.com/users/{user}/watched" \
.format(user=user)
return self.fetch_allpages(url)
# Check if authenticated user is watching repo
def user_watching_repo(self, user, repo):
url = "https://api.github.com/user/watched/{user}/{repo}" \
.format(user=user, repo=repo)
try:
res = self.http_get(url)
return True
except urllib.error.HTTPError as res:
if res.code == 404: # Not Found
return False
raise
# Let authenticated user watch repo.
def user_watch(self, user, repo):
url = "https://api.github.com/user/watched/{user}/{repo}" \
.format(user=user, repo=repo)
return self.http_put(url)
# Let authenticated user stop watch user.
def user_stopwatch(self, user, repo):
url = "https://api.github.com/user/watched/{user}/{repo}" \
.format(user=user, repo=repo)
return self.http_delete(url)
def user_follower_list(self, user):
url = "https://api.github.com/users/{user}/followers" \
.format(user=user)
return self.fetch_allpages(url)
def user_following_list(self, user):
url = "https://api.github.com/users/{user}/following" \
.format(user=user)
return self.fetch_allpages(url)
# Check if authenticated user is following user.
def user_following_user(self, user):
url = "https://api.github.com/user/following/{user}" \
.format(user=user)
try:
res = self.http_get(url)
return True
except urllib.error.HTTPError as res:
if res.code == 404: # Not Found
return False
raise
# Let authenticated user follow user.
def user_follow(self, user):
url = "https://api.github.com/user/following/{user}" \
.format(user=user)
return self.http_put(url)
# Let authenticated user unfollow user.
def user_unfollow(self, user):
url = "https://api.github.com/user/following/{user}" \
.format(user=user)
return self.http_delete(url)
@mattn
Copy link

mattn commented May 31, 2012

cool

@ynkdir
Copy link
Author

ynkdir commented May 31, 2012

thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment