Skip to content

Instantly share code, notes, and snippets.

@5263
Last active April 24, 2021 16:33
Show Gist options
  • Save 5263/e04bdd5d4dd6b4fbac49 to your computer and use it in GitHub Desktop.
Save 5263/e04bdd5d4dd6b4fbac49 to your computer and use it in GitHub Desktop.
gitwebparser
#!/usr/bin/env python3
import bs4
import requests
sa = requests.Session()
sa.verify = False # workaround
# sa.auth = BearerAuth()
sa.timeout = 10
def url2hash(href):
return [p[2:] for p in href[href.find("?") :].split(";") if p.startswith("h=")][0]
def githash(str1, otype="blob"):
import hashlib
hash1 = hashlib.sha1((b"%b %d\0" % (otype.encode("utf-8"), len(str1))))
hash1.update(str1)
return hash1.hexdigest()
def convertdate(str1):
import time
timezone = str1[-6:-1]
# utcdate=str1[:-14]
timestruct = time.strptime(str1[:-20], "%a, %d %b %Y %H:%M:%S")
import calendar
timestamp = calendar.timegm(timestruct)
return "%d %s" % (timestamp, timezone)
def convertrights(rights):
rwx2oct = lambda rwx: sum(((c != "-") << i) for i, c in enumerate(rwx[::-1]))
if rights[-10:-9] == "d": # tree
return "040000"
else: # blob
u = rwx2oct(rights[-9:-6])
g = rwx2oct(rights[-6:-3])
a = rwx2oct(rights[-3:])
return "100%d%d%d" % (u, g, a)
def checkobjects(objlist):
# git cat-file --batch-check
import subprocess
p = subprocess.Popen(
["git", "cat-file", "--batch-check"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
stdout, stderr = p.communicate("\n".join(objlist))
p.stdin.close()
if not p.wait():
present = {}
missing = set()
for lineraw in stdout.split("\n"):
line = lineraw.strip()
if line:
lsplit = line.split(" ")
if "missing" in lsplit[1]:
missing.add(lsplit[0])
else:
try:
present[lsplit[0]] = (lsplit[1], int(lsplit[2]))
except ValueError:
print(lsplit)
return present, missing
def addobject(objstr, otype="blob"):
import subprocess
p = subprocess.Popen(
["git", "hash-object", "-w", "--stdin", "-t", otype],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # universal_newlines=True,
text=False,
)
stdout, stderr = p.communicate(objstr)
p.stdin.close()
if not p.wait():
return stdout.strip()
def addtree(entrylist):
import subprocess
p = subprocess.Popen(
["git", "mktree", "--missing"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
stdout, stderr = p.communicate("\n".join(entrylist))
p.stdin.close()
if not p.wait():
return stdout.strip()
pass
def parsetree(str1):
s = bs4.BeautifulSoup(str1)
treetab = s.find("table", {"class": "tree"})
treeentries = []
subtrees = set()
blobs = set()
for entry in treetab.findAll("tr"):
tds = entry.findAll("td")
gitrights = convertrights(tds[0].text)
name = tds[2].text
a = tds[3].find("a")
otype = a.text
href = a.attrs["href"]
hash1 = url2hash(href)
treeentries.append("%s %s %s\t%s" % (gitrights, otype, hash1, name))
if otype == "tree":
subtrees.add(hash1)
elif otype == "blob":
blobs.add(hash1)
return treeentries, subtrees, blobs
def parsetag(str1):
s = bs4.BeautifulSoup(str1)
subject2 = s.find("a", {"class": "title"}).text.strip().replace("\xa0", " ")
taghash = s.find("a", {"class": "title"})["href"][-40:]
subject = s.find("div", {"class": "page_body"}).text.strip().replace("\xa0", " ")
objhead = s.find("table", {"class": "object_header"})
trs = objhead.findAll("tr")
commithash = trs[0].findAll("td")[1].text.strip()
author = trs[1].findAll("td")[1].text.strip()
authordate = trs[2].findAll("td")[1].text.strip()
tagobj = (
(
"object %s\ntype commit\ntag %s\ntagger %s %s\n\n%s\n"
% (commithash, subject2, author, convertdate(authordate), subject)
)
.replace("\xa0", " ")
.encode("utf8")
)
newhash = githash(tagobj, "tag")
if newhash != taghash:
raise RuntimeError
else:
return tagobj
def parsecommit(str1):
s = bs4.BeautifulSoup(str1)
subject2 = s.find("a", {"class": "title"}).text.strip().replace("\xa0", " ")
subject = s.find("div", {"class": "page_body"}).text.strip().replace("\xa0", " ")
objhead = s.find("table", {"class": "object_header"})
trs = objhead.findAll("tr")
author = trs[0].findAll("td")[1].text.strip()
authordate = trs[1].findAll("td")[1].text.strip()
committer = trs[2].findAll("td")[1].text.strip()
committerdate = trs[3].findAll("td")[1].text.strip()
commithash = trs[4].findAll("td")[1].text.strip()
treehash = trs[5].findAll("td")[1].text.strip()
parenthashes = [tr.findAll("td")[1].text.strip() for tr in trs[6:]]
parents = "\n".join(["parent %s" % ph for ph in parenthashes])
commitobj = (
(
"tree %s\n%s\nauthor %s %s\ncommitter %s %s\n\n%s\n"
% (
treehash,
parents,
author,
convertdate(authordate),
committer,
convertdate(committerdate),
subject,
)
)
.replace("\xa0", " ")
.encode("utf8")
)
newhash = githash(commitobj, "commit")
if newhash == commithash:
return commitobj, parenthashes, treehash
else:
print("wanted", commithash)
print("got ", newhash)
for newsubject in (
("Automatic %s\n" % subject),
(subject.strip()),
("%s\n" % subject.strip()),
(" %s\n" % subject),
):
print("retrying")
commitobj = (
(
"tree %s\n%s\nauthor %s %s\ncommitter %s %s\n\n%s"
% (
treehash,
parents,
author,
convertdate(authordate),
committer,
convertdate(committerdate),
newsubject,
)
)
.replace("\xa0", " ")
.encode("utf8")
)
newhash = githash(commitobj, "commit")
if newhash == commithash:
return commitobj, parenthashes, treehash
else:
print("wanted", commithash)
print("got ", newhash)
print(commitobj)
raise ValueError
def downloadobjurllib(url, project, hash1, otype="commit"):
import urllib2
resp = urllib2.urlopen("%s?p=%s;a=%s;h=%s" % (url, project, otype, hash1))
html = resp.read()
resp.close()
return html
def downloadobj(url, project, hash1, otype="commit"):
# fullurl='%s?p=%s;a=%s;h=%s' % (url,project,otype,hash1)
params = {"p": project, "a": otype, "h": hash1}
r = sa.get(url, params=params)
if otype == "blob_plain":
return b"".join(r.iter_content(2000))
# r.encoding = None
else:
return r.text
def traversecommits(
hashes, stopifcommitpresent=True, traversetree=True, downloadblobs=True, stopat=()
):
stopats = frozenset(stopat)
commitstraversed = set()
refnames = {}
refs = set(hashes)
parents = set()
roottrees = set()
while len(parents) > 0 or len(refs) > 0:
processingref = len(refs) > 0
if processingref:
hash1 = refs.pop()
print("ref %s" % hash1, end=" ")
downloadcommit = True
else:
hash1 = parents.pop()
print(hash1, end=" ")
downloadcommit = len(checkobjects((hash1,))[1]) > 0
if downloadcommit:
print(" not present")
str1 = downloadobj(url, proj, hash1, "commit")
co, p, t = parsecommit(str1)
hash2 = addobject(co, "commit")
if hash2 != hash1:
refnames[hash1] = hash2
else:
print(" present")
import os
str1 = os.popen("git cat-file commit %s" % hash1).read()
l1 = str1.split("\n")
t = l1[0].split(" ", 1)[1].strip()
p = [ps[7:].strip() for ps in l1 if ps.startswith("parent ")]
roottrees.add(t)
parents.update(p)
commitstraversed.add(hash1)
if stopifcommitpresent:
parents = checkobjects(tuple(parents))[1]
else:
parents.discard(hash1)
print(len(parents))
if len(stopats) and stopats.issubset(commitstraversed):
break
if traversetree:
traversetree2(tuple(roottrees), downloadblobs=downloadblobs)
return refnames
def subtreeslocal(tree):
import os
subtrees = []
blobs = []
lstr = os.popen("git ls-tree %s" % tree)
for bline in lstr.read().split("\n"):
if bline.strip():
odata, opath = bline.split("\t", 1)
oflags, otype, ohash = odata.split(" ", 2)
if otype == "tree":
subtrees.append(ohash)
elif otype == "blob":
blobs.append(ohash)
return subtrees, blobs
def traversetree2(roottrees, downloadblobs=False):
# completelydescended=set()
localypresent = set()
blobswanted = set()
def descenttree(treehash):
if treehash not in completelydescended:
st = None
if treehash not in localypresent:
# doubel check if previously downloaded
p, m = checkobjects((treehash,))
if len(m) > 0:
print("downloading %s" % treehash)
te, st, bl = parsetree(downloadobj(url, proj, treehash, "tree"))
addtree(te)
localypresent.add(treehash)
if st is None:
st, bl = subtreeslocal(treehash)
localypresent.add(treehash)
for subtree in st:
descenttree(subtree)
if downloadblobs:
p, m = checkobjects(bl)
blobswanted.update(checkobjects(bl)[1])
completelydescended.add(treehash)
for roott in roottrees:
print("descending tree %s" % roott)
descenttree(roott)
if downloadblobs:
skipped = set()
p, m = checkobjects(tuple(blobswanted))
for blobhash in tuple(m):
print("downloading blob %s" % blobhash)
blobstr = downloadobj(url, proj, blobhash, "blob_plain")
h1 = addobject(blobstr, "blob")
if h1.decode("utf-8") != blobhash:
print("Hash mismatch!!! %s" % h1)
skipped.add(h1)
if len(skipped):
print("skipped %s" % skipped)
def traversetree(treehashes):
import time
knowntrees = set(treehashes)
while True:
knowntrees = checkobjects(tuple(knowntrees))[1]
h = knowntrees.pop()
print(h, "of", len(knowntrees))
te, st, bl = parsetree(downloadobj(url, proj, h, "tree"))
print(addtree(te))
knowntrees.update(st)
# time.sleep(2)
def parseheads(str1):
s = bs4.BeautifulSoup(str1)
t = s.find("table", {"class": "heads"})
trs = t.findAll("tr")
heads = []
for tr in trs:
tds = tr.findAll("td")
a = tds[1].a
name = a.text
href = a.attrs["href"]
hash1 = url2hash(href)
heads.append((name, hash1, tds[0].text))
return heads
def fetch(
namesandhashes,
traversetree=True,
stopifcommitpresent=True,
downloadblobs=True,
stopat=(),
astags=False,
):
names, hashes = zip(*namesandhashes)
# fetch all at once to profit from allready decendet trees
refnames = traversecommits(
hashes,
traversetree=traversetree,
stopifcommitpresent=stopifcommitpresent,
downloadblobs=downloadblobs,
stopat=stopat,
)
# set refs
for name, ref in namesandhashes:
if ref in refnames:
import os
hash1 = refnames[ref]
if isinstance(hash1, bytes):
hash1 = hash1.decode("utf-8")
if ref.startswith("refs/tags/") or astags:
os.popen("git tag -f %s %s" % (name, hash1)).close()
else:
os.popen("git branch -f %s %s" % (name, hash1)).close()
print("%s %s" % (hash1, name))
if __name__ == "__main__":
url = ""
proj = ""
import pickle
try:
completelydescended = pickle.load(open("completedtrees.pickle", "rb"))
except:
completelydescended = set()
import sys
if len(sys.argv) > 1 and completelydescended:
# fetch((('master','refs/heads/master'),))
branchlist = []
for name in sys.argv[1:]:
if len(name) == 40:
branchref = name
elif name.startswith("refs/tags/"):
branchref = name
name = name[10:]
else:
branchref = f"refs/heads/{name}"
branchlist.append((name, branchref))
fetch(branchlist)
else:
if False:
import IPython
IPython.embed() # damages the readline histroy
pickle.dump(completelydescended, open("completedtrees.pickle", "wb"), -1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment