Last active
April 24, 2021 16:33
-
-
Save 5263/e04bdd5d4dd6b4fbac49 to your computer and use it in GitHub Desktop.
gitwebparser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import bs4 | |
import requests | |
sa = requests.Session() | |
sa.verify = False # workaround | |
# sa.auth = BearerAuth() | |
sa.timeout = 10 | |
def url2hash(href): | |
return [p[2:] for p in href[href.find("?") :].split(";") if p.startswith("h=")][0] | |
def githash(str1, otype="blob"): | |
import hashlib | |
hash1 = hashlib.sha1((b"%b %d\0" % (otype.encode("utf-8"), len(str1)))) | |
hash1.update(str1) | |
return hash1.hexdigest() | |
def convertdate(str1): | |
import time | |
timezone = str1[-6:-1] | |
# utcdate=str1[:-14] | |
timestruct = time.strptime(str1[:-20], "%a, %d %b %Y %H:%M:%S") | |
import calendar | |
timestamp = calendar.timegm(timestruct) | |
return "%d %s" % (timestamp, timezone) | |
def convertrights(rights): | |
rwx2oct = lambda rwx: sum(((c != "-") << i) for i, c in enumerate(rwx[::-1])) | |
if rights[-10:-9] == "d": # tree | |
return "040000" | |
else: # blob | |
u = rwx2oct(rights[-9:-6]) | |
g = rwx2oct(rights[-6:-3]) | |
a = rwx2oct(rights[-3:]) | |
return "100%d%d%d" % (u, g, a) | |
def checkobjects(objlist): | |
# git cat-file --batch-check | |
import subprocess | |
p = subprocess.Popen( | |
["git", "cat-file", "--batch-check"], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
universal_newlines=True, | |
) | |
stdout, stderr = p.communicate("\n".join(objlist)) | |
p.stdin.close() | |
if not p.wait(): | |
present = {} | |
missing = set() | |
for lineraw in stdout.split("\n"): | |
line = lineraw.strip() | |
if line: | |
lsplit = line.split(" ") | |
if "missing" in lsplit[1]: | |
missing.add(lsplit[0]) | |
else: | |
try: | |
present[lsplit[0]] = (lsplit[1], int(lsplit[2])) | |
except ValueError: | |
print(lsplit) | |
return present, missing | |
def addobject(objstr, otype="blob"): | |
import subprocess | |
p = subprocess.Popen( | |
["git", "hash-object", "-w", "--stdin", "-t", otype], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, # universal_newlines=True, | |
text=False, | |
) | |
stdout, stderr = p.communicate(objstr) | |
p.stdin.close() | |
if not p.wait(): | |
return stdout.strip() | |
def addtree(entrylist): | |
import subprocess | |
p = subprocess.Popen( | |
["git", "mktree", "--missing"], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
universal_newlines=True, | |
) | |
stdout, stderr = p.communicate("\n".join(entrylist)) | |
p.stdin.close() | |
if not p.wait(): | |
return stdout.strip() | |
pass | |
def parsetree(str1): | |
s = bs4.BeautifulSoup(str1) | |
treetab = s.find("table", {"class": "tree"}) | |
treeentries = [] | |
subtrees = set() | |
blobs = set() | |
for entry in treetab.findAll("tr"): | |
tds = entry.findAll("td") | |
gitrights = convertrights(tds[0].text) | |
name = tds[2].text | |
a = tds[3].find("a") | |
otype = a.text | |
href = a.attrs["href"] | |
hash1 = url2hash(href) | |
treeentries.append("%s %s %s\t%s" % (gitrights, otype, hash1, name)) | |
if otype == "tree": | |
subtrees.add(hash1) | |
elif otype == "blob": | |
blobs.add(hash1) | |
return treeentries, subtrees, blobs | |
def parsetag(str1): | |
s = bs4.BeautifulSoup(str1) | |
subject2 = s.find("a", {"class": "title"}).text.strip().replace("\xa0", " ") | |
taghash = s.find("a", {"class": "title"})["href"][-40:] | |
subject = s.find("div", {"class": "page_body"}).text.strip().replace("\xa0", " ") | |
objhead = s.find("table", {"class": "object_header"}) | |
trs = objhead.findAll("tr") | |
commithash = trs[0].findAll("td")[1].text.strip() | |
author = trs[1].findAll("td")[1].text.strip() | |
authordate = trs[2].findAll("td")[1].text.strip() | |
tagobj = ( | |
( | |
"object %s\ntype commit\ntag %s\ntagger %s %s\n\n%s\n" | |
% (commithash, subject2, author, convertdate(authordate), subject) | |
) | |
.replace("\xa0", " ") | |
.encode("utf8") | |
) | |
newhash = githash(tagobj, "tag") | |
if newhash != taghash: | |
raise RuntimeError | |
else: | |
return tagobj | |
def parsecommit(str1): | |
s = bs4.BeautifulSoup(str1) | |
subject2 = s.find("a", {"class": "title"}).text.strip().replace("\xa0", " ") | |
subject = s.find("div", {"class": "page_body"}).text.strip().replace("\xa0", " ") | |
objhead = s.find("table", {"class": "object_header"}) | |
trs = objhead.findAll("tr") | |
author = trs[0].findAll("td")[1].text.strip() | |
authordate = trs[1].findAll("td")[1].text.strip() | |
committer = trs[2].findAll("td")[1].text.strip() | |
committerdate = trs[3].findAll("td")[1].text.strip() | |
commithash = trs[4].findAll("td")[1].text.strip() | |
treehash = trs[5].findAll("td")[1].text.strip() | |
parenthashes = [tr.findAll("td")[1].text.strip() for tr in trs[6:]] | |
parents = "\n".join(["parent %s" % ph for ph in parenthashes]) | |
commitobj = ( | |
( | |
"tree %s\n%s\nauthor %s %s\ncommitter %s %s\n\n%s\n" | |
% ( | |
treehash, | |
parents, | |
author, | |
convertdate(authordate), | |
committer, | |
convertdate(committerdate), | |
subject, | |
) | |
) | |
.replace("\xa0", " ") | |
.encode("utf8") | |
) | |
newhash = githash(commitobj, "commit") | |
if newhash == commithash: | |
return commitobj, parenthashes, treehash | |
else: | |
print("wanted", commithash) | |
print("got ", newhash) | |
for newsubject in ( | |
("Automatic %s\n" % subject), | |
(subject.strip()), | |
("%s\n" % subject.strip()), | |
(" %s\n" % subject), | |
): | |
print("retrying") | |
commitobj = ( | |
( | |
"tree %s\n%s\nauthor %s %s\ncommitter %s %s\n\n%s" | |
% ( | |
treehash, | |
parents, | |
author, | |
convertdate(authordate), | |
committer, | |
convertdate(committerdate), | |
newsubject, | |
) | |
) | |
.replace("\xa0", " ") | |
.encode("utf8") | |
) | |
newhash = githash(commitobj, "commit") | |
if newhash == commithash: | |
return commitobj, parenthashes, treehash | |
else: | |
print("wanted", commithash) | |
print("got ", newhash) | |
print(commitobj) | |
raise ValueError | |
def downloadobjurllib(url, project, hash1, otype="commit"): | |
import urllib2 | |
resp = urllib2.urlopen("%s?p=%s;a=%s;h=%s" % (url, project, otype, hash1)) | |
html = resp.read() | |
resp.close() | |
return html | |
def downloadobj(url, project, hash1, otype="commit"): | |
# fullurl='%s?p=%s;a=%s;h=%s' % (url,project,otype,hash1) | |
params = {"p": project, "a": otype, "h": hash1} | |
r = sa.get(url, params=params) | |
if otype == "blob_plain": | |
return b"".join(r.iter_content(2000)) | |
# r.encoding = None | |
else: | |
return r.text | |
def traversecommits( | |
hashes, stopifcommitpresent=True, traversetree=True, downloadblobs=True, stopat=() | |
): | |
stopats = frozenset(stopat) | |
commitstraversed = set() | |
refnames = {} | |
refs = set(hashes) | |
parents = set() | |
roottrees = set() | |
while len(parents) > 0 or len(refs) > 0: | |
processingref = len(refs) > 0 | |
if processingref: | |
hash1 = refs.pop() | |
print("ref %s" % hash1, end=" ") | |
downloadcommit = True | |
else: | |
hash1 = parents.pop() | |
print(hash1, end=" ") | |
downloadcommit = len(checkobjects((hash1,))[1]) > 0 | |
if downloadcommit: | |
print(" not present") | |
str1 = downloadobj(url, proj, hash1, "commit") | |
co, p, t = parsecommit(str1) | |
hash2 = addobject(co, "commit") | |
if hash2 != hash1: | |
refnames[hash1] = hash2 | |
else: | |
print(" present") | |
import os | |
str1 = os.popen("git cat-file commit %s" % hash1).read() | |
l1 = str1.split("\n") | |
t = l1[0].split(" ", 1)[1].strip() | |
p = [ps[7:].strip() for ps in l1 if ps.startswith("parent ")] | |
roottrees.add(t) | |
parents.update(p) | |
commitstraversed.add(hash1) | |
if stopifcommitpresent: | |
parents = checkobjects(tuple(parents))[1] | |
else: | |
parents.discard(hash1) | |
print(len(parents)) | |
if len(stopats) and stopats.issubset(commitstraversed): | |
break | |
if traversetree: | |
traversetree2(tuple(roottrees), downloadblobs=downloadblobs) | |
return refnames | |
def subtreeslocal(tree): | |
import os | |
subtrees = [] | |
blobs = [] | |
lstr = os.popen("git ls-tree %s" % tree) | |
for bline in lstr.read().split("\n"): | |
if bline.strip(): | |
odata, opath = bline.split("\t", 1) | |
oflags, otype, ohash = odata.split(" ", 2) | |
if otype == "tree": | |
subtrees.append(ohash) | |
elif otype == "blob": | |
blobs.append(ohash) | |
return subtrees, blobs | |
def traversetree2(roottrees, downloadblobs=False): | |
# completelydescended=set() | |
localypresent = set() | |
blobswanted = set() | |
def descenttree(treehash): | |
if treehash not in completelydescended: | |
st = None | |
if treehash not in localypresent: | |
# doubel check if previously downloaded | |
p, m = checkobjects((treehash,)) | |
if len(m) > 0: | |
print("downloading %s" % treehash) | |
te, st, bl = parsetree(downloadobj(url, proj, treehash, "tree")) | |
addtree(te) | |
localypresent.add(treehash) | |
if st is None: | |
st, bl = subtreeslocal(treehash) | |
localypresent.add(treehash) | |
for subtree in st: | |
descenttree(subtree) | |
if downloadblobs: | |
p, m = checkobjects(bl) | |
blobswanted.update(checkobjects(bl)[1]) | |
completelydescended.add(treehash) | |
for roott in roottrees: | |
print("descending tree %s" % roott) | |
descenttree(roott) | |
if downloadblobs: | |
skipped = set() | |
p, m = checkobjects(tuple(blobswanted)) | |
for blobhash in tuple(m): | |
print("downloading blob %s" % blobhash) | |
blobstr = downloadobj(url, proj, blobhash, "blob_plain") | |
h1 = addobject(blobstr, "blob") | |
if h1.decode("utf-8") != blobhash: | |
print("Hash mismatch!!! %s" % h1) | |
skipped.add(h1) | |
if len(skipped): | |
print("skipped %s" % skipped) | |
def traversetree(treehashes): | |
import time | |
knowntrees = set(treehashes) | |
while True: | |
knowntrees = checkobjects(tuple(knowntrees))[1] | |
h = knowntrees.pop() | |
print(h, "of", len(knowntrees)) | |
te, st, bl = parsetree(downloadobj(url, proj, h, "tree")) | |
print(addtree(te)) | |
knowntrees.update(st) | |
# time.sleep(2) | |
def parseheads(str1): | |
s = bs4.BeautifulSoup(str1) | |
t = s.find("table", {"class": "heads"}) | |
trs = t.findAll("tr") | |
heads = [] | |
for tr in trs: | |
tds = tr.findAll("td") | |
a = tds[1].a | |
name = a.text | |
href = a.attrs["href"] | |
hash1 = url2hash(href) | |
heads.append((name, hash1, tds[0].text)) | |
return heads | |
def fetch( | |
namesandhashes, | |
traversetree=True, | |
stopifcommitpresent=True, | |
downloadblobs=True, | |
stopat=(), | |
astags=False, | |
): | |
names, hashes = zip(*namesandhashes) | |
# fetch all at once to profit from allready decendet trees | |
refnames = traversecommits( | |
hashes, | |
traversetree=traversetree, | |
stopifcommitpresent=stopifcommitpresent, | |
downloadblobs=downloadblobs, | |
stopat=stopat, | |
) | |
# set refs | |
for name, ref in namesandhashes: | |
if ref in refnames: | |
import os | |
hash1 = refnames[ref] | |
if isinstance(hash1, bytes): | |
hash1 = hash1.decode("utf-8") | |
if ref.startswith("refs/tags/") or astags: | |
os.popen("git tag -f %s %s" % (name, hash1)).close() | |
else: | |
os.popen("git branch -f %s %s" % (name, hash1)).close() | |
print("%s %s" % (hash1, name)) | |
if __name__ == "__main__": | |
url = "" | |
proj = "" | |
import pickle | |
try: | |
completelydescended = pickle.load(open("completedtrees.pickle", "rb")) | |
except: | |
completelydescended = set() | |
import sys | |
if len(sys.argv) > 1 and completelydescended: | |
# fetch((('master','refs/heads/master'),)) | |
branchlist = [] | |
for name in sys.argv[1:]: | |
if len(name) == 40: | |
branchref = name | |
elif name.startswith("refs/tags/"): | |
branchref = name | |
name = name[10:] | |
else: | |
branchref = f"refs/heads/{name}" | |
branchlist.append((name, branchref)) | |
fetch(branchlist) | |
else: | |
if False: | |
import IPython | |
IPython.embed() # damages the readline histroy | |
pickle.dump(completelydescended, open("completedtrees.pickle", "wb"), -1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment