Last active
March 14, 2019 13:49
-
-
Save FrankSpierings/834fab26134193eefb6de5b41c29d2d1 to your computer and use it in GitHub Desktop.
Grab git files from a webapplication.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import tempfile | |
import os | |
import logging | |
import shutil | |
import subprocess | |
import re | |
try: | |
log | |
except NameError: | |
log = logging.getLogger() | |
log.setLevel(logging.DEBUG) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
ch.setFormatter(logging.Formatter('[+] %(message)s')) | |
log.addHandler(ch) | |
logging.getLogger("requests").setLevel(logging.WARNING) | |
logging.getLogger("urllib3").setLevel(logging.WARNING) | |
class GitExplorer: | |
def __init__(self, url, dst=tempfile.mkdtemp(dir='/tmp/data')): | |
self.url = url | |
self.dir = dst | |
if not os.path.exists(self.dir): | |
os.mkdir(self.dir) | |
self.gitdir = os.path.join(dst, '.git') | |
if not os.path.exists(self.gitdir): | |
os.mkdir(self.gitdir) | |
self.preparedir() | |
self.checkurl() | |
self.loadnotfound() | |
def loadnotfound(self): | |
self.notdownloadable = [] | |
self.notdownloadablefile = os.path.join(self.dir, 'notdownloadablefile.txt') | |
if os.path.exists(self.notdownloadablefile): | |
with open(self.notdownloadablefile, 'rb') as f: | |
for line in f: | |
line = line.decode().strip() | |
self.notdownloadable.append(line) | |
def updatenotfound(self, url): | |
self.notdownloadable.append(url) | |
with open(self.notdownloadablefile, 'ab') as f: | |
f.write(url.encode()) | |
f.write(b'\n') | |
def checkurl(self, url=None): | |
if url is None: | |
url = self.url | |
r = requests.get(url) | |
if r.status_code != 404: | |
return True | |
else: | |
return False | |
def preparedir(self): | |
dirs = [ 'refs/heads', | |
'branches', | |
'objects/pack', | |
'objects/info', | |
] | |
for d in dirs: | |
dst = os.path.join(self.gitdir, d) | |
if not os.path.exists(dst): | |
os.makedirs(dst) | |
def getbase(self): | |
paths = [ 'HEAD', | |
'objects/info/packs', | |
'description', | |
'config', | |
'COMMIT_EDITMSG', | |
'index', | |
'packed-refs', | |
'refs/heads/master', | |
'refs/remotes/origin/HEAD', | |
'refs/stash', | |
'logs/HEAD', | |
'logs/refs/heads/master', | |
'logs/refs/remotes/origin/HEAD', | |
'info/refs', | |
'info/exclude', | |
] | |
for path in paths: | |
self.getfile(path) | |
def getfile(self, relpath): | |
path = os.path.join(self.gitdir, relpath) | |
if not os.path.exists(path): | |
if self.downloadfile(relpath): | |
return path | |
else: | |
return None | |
else: | |
return path | |
def getobject(self, hashstr): | |
relpath = os.path.join('objects', hashstr[0:2], hashstr[2:]) | |
return self.getfile(relpath) | |
def getobjectcontents(self, hashstr): | |
p = subprocess.Popen(['git', | |
'cat-file', | |
'-p', | |
hashstr], | |
cwd=self.gitdir, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
stdin=subprocess.PIPE) | |
# Try to grab all the referenced objects | |
data = p.stdout.read() | |
return data | |
def getobjecttree(self, hashstr): | |
relpath = os.path.join('objects', hashstr[0:2], hashstr[2:]) | |
path = self.getfile(relpath) | |
if path is not None: | |
objecttype = self.getobjecttype(hashstr) | |
if objecttype != 'blob': | |
# log.debug('{0} != blob'.format(objecttype)) | |
data = self.getobjectcontents(hashstr) | |
# log.info(data) | |
# tree = re.findall(b'tree (.*?)$', data, re.DOTALL | re.MULTILINE) | |
# parent = re.findall(b'tree (.*?)$', data, re.DOTALL | re.MULTILINE) | |
# if len(tree) > 0: | |
# item = tree[0].decode().strip() | |
# self.getobjecttree(item) | |
# if len(parent) > 0: | |
# item = parent[0].decode().strip() | |
# self.getobjecttree(item) | |
hashes = re.findall(b'([a-f0-9]{40})', data, re.DOTALL | re.MULTILINE) | |
for newhashstr in hashes: | |
newhashstr = newhashstr.decode() | |
self.getobjecttree(newhashstr) | |
def downloadfile(self, relpath): | |
path = os.path.join(self.gitdir, relpath) | |
dirpath = os.path.dirname(relpath) | |
filepath = os.path.join(self.gitdir, path) | |
if dirpath != '': | |
fulldirpath = os.path.join(self.gitdir, dirpath) | |
if not os.path.exists(fulldirpath): | |
os.makedirs(fulldirpath) | |
url = '{0}/{1}'.format(self.url, relpath) | |
if url not in self.notdownloadable: | |
log.info('Attempt to download: {0}'.format(url)) | |
with requests.get(url, stream=True) as r: | |
if r.status_code == 200: | |
with open(filepath, 'wb') as f: | |
shutil.copyfileobj(r.raw, f) | |
return True | |
else: | |
log.warning('Unable to download: {0}'.format(url)) | |
self.updatenotfound(url) | |
else: | |
log.warning('Unable to download (previously tried): {0}'.format(url)) | |
return False | |
def head(self): | |
path = self.getfile('HEAD') | |
if path is not None: | |
with open(path, 'r') as f: | |
for item in f: | |
ref = item.split(' ')[1].strip() | |
filepath = self.getfile(ref) | |
if filepath is not None: | |
with open(filepath, 'r') as f2: | |
for hashstr in f2: | |
hashstr = hashstr.strip() | |
self.getobjecttree(hashstr) | |
def refs(self): | |
path = self.getfile('info/refs') | |
if path is not None: | |
with open(path, 'rb') as f: | |
for data in f: | |
hashes = re.findall(b'([a-f0-9]{40})', data, re.DOTALL | re.MULTILINE) | |
files = re.findall(b'[a-f0-9]{40}\s+(.*?)$', data, re.DOTALL | re.MULTILINE) | |
for newhashstr in hashes: | |
newhashstr = newhashstr.decode() | |
self.getobjecttree(newhashstr) | |
for file in files: | |
file = file.decode() | |
# log.debug(file) | |
self.getfile(file) | |
def packs(self): | |
path = self.getfile('objects/info/packs') | |
if path is not None: | |
with open(path, 'rb') as f: | |
for data in f: | |
packs = re.findall(b'(pack-[a-f0-9]{40}\.pack)', data, re.DOTALL | re.MULTILINE) | |
for pack in packs: | |
relpath = '{0}/{1}'.format('objects/pack', pack.decode()) | |
self.getfile(relpath) | |
def getobjecttype(self, hashstr): | |
p = subprocess.Popen(['git', | |
'cat-file', | |
'-t', | |
hashstr], | |
cwd=self.gitdir, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
stdin=subprocess.PIPE) | |
# Try to grab all the referenced objects | |
data = p.stdout.read() | |
output = data.strip() | |
output = output.decode() | |
log.debug('Object {0} is type {1}'.format(hashstr, output)) | |
return output | |
def go(self): | |
self.getbase() | |
self.refs() | |
self.packs() | |
self.head() | |
g = GitExplorer("http://localhost:8000/.git", dst='/tmp/data/localhost-git') | |
g.go() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note:
It seems the pack files can only be found if 'objects/info/packs' is available. Reference: https://git-scm.com/book/en/v2/Git-Internals-Transfer-Protocols. They can not be predicted or found in another way (let me know if I'm wrong). This could be a problem when trying to download it (while being blind, because Indexes are disabled.)