Skip to content

Instantly share code, notes, and snippets.

@maliubiao
Last active January 1, 2016 07:29
Show Gist options
  • Select an option

  • Save maliubiao/8112188 to your computer and use it in GitHub Desktop.

Select an option

Save maliubiao/8112188 to your computer and use it in GitHub Desktop.
git internel , read git sha1 file.
import os.path
import pdb
import sys
import io
from struct import unpack
from time import ctime
#flags
CE_STAGEMASK = 0x3000
CE_EXTENDED = 0x4000
CE_VALID = 0x8000
CE_STAGESHIFT = 12
CE_NAMEMASK = 0x0fff
def read_from_disk(path):
if not os.path.exists(path):
raise Exception("path doesn't exists")
entries_list = []
index = open(path, "r")
signature = index.read(4)
if signature != "DIRC":
raise Exception("path is not a git index file")
#big endian
version = unpack(">I", index.read(4))[0]
entries = unpack(">I", index.read(4))[0]
for i in range(entries):
#entries
ctime = unpack(">I", index.read(4))[0] + (float)(unpack(">I", index.read(4))[0])/1000000
mtime = unpack(">I", index.read(4))[0] + (float)(unpack(">I", index.read(4))[0])/1000000
dev = unpack(">I", index.read(4))[0]
ino = unpack(">I", index.read(4))[0]
mode = unpack(">I", index.read(4))[0]
uid = unpack(">I", index.read(4))[0]
gid = unpack(">I", index.read(4))[0]
size = unpack(">I", index.read(4))[0]
#20 byte SHA1
sha1 = index.read(20);
flags = unpack(">H", index.read(2))[0]
if flags & CE_EXTENDED:
#so far, ignore the second flag
index.read(2)
nlen = flags & CE_NAMEMASK
name = index.read(nlen)
#skip name padding bytes
while True:
#last one, break
if i == entries - 1:
break;
if index.read(1) != "\x00":
index.seek(-1, io.SEEK_CUR)
break
entries_list.append({
"ctime": ctime,
"mtime": mtime,
"dev": dev,
"ino": ino,
"mode": mode,
"uid": uid,
"gid": gid,
"size": size,
"sha1": sha1.encode("hex"),
"name": name
})
index.close()
return entries_list
#! /usr/bin/env python
import zlib
import os.path
import sys
import pdb
import hashlib
import io
import re
import uuid
import time
from cStringIO import StringIO
from struct import unpack
from os.path import join as path_join
from subprocess import call
import cache_area
PATH_MAX = 1024
#object type
OBJ_BAD = -1
OBJ_NONE = 0
OBJ_COMMIT = 1
OBJ_TREE = 2
OBJ_BLOB = 3
OBJ_TAG = 4
OBJ_OFS_DELTA = 6
OBJ_REF_DELTA = 7
sha1_file_header = {
OBJ_COMMIT: "commit ",
OBJ_TREE: "tree ",
OBJ_BLOB: "blob ",
OBJ_TAG: "tag "
}
default_gitobjects_dir = ".git/objects"
default_gitrefs_dir = ".git/refs"
default_git_dir = ".git"
def simple_print(obj):
if isinstance(obj, dict):
for k, v in obj.items():
print "{:<15}: {:<20}".format(k, v)
if isinstance(obj, list):
for i in obj:
for k, v in i.items():
print "{:<15}: {:<20}".format(k, v)
if isinstance(obj, tuple):
for i in obj:
print i
def read_sha1_file(path, content=True):
if not os.path.exists(path):
raise Exception("path doesn't exists")
f = open(path, "r")
c = zlib.decompress(f.read())
f.close()
space = c.find(" ")
if space < 0 or space > 10:
raise Exception("corrupted git sha1 file")
length = c.find("\x00", space)
if length < 0:
raise Exception("corrupted git sha1 file")
lenstr = c[space+1:length]
if [x for x in lenstr if ord(x) > (ord('0')+9)]:
raise Exception("corrupted git sha1 file")
#type, length, content
if content:
return c[:space], int(lenstr), c[length+1:]
else:
return c[:space], int(lenstr)
def read_idx_file(path):
if not os.path.exists(path):
raise Exception("path doesn't exists")
idx_dict = {}
f = open(path, "r")
if f.read(4) != "\xff\x74\x4f\x63":
raise Exception("not a idx file")
version = unpack(">I", f.read(4))[0]
#verify The "first level, fan out" table
#this table is used to avoid to
#do eight extra binary search iterations
nr = 0
lookup_list = []
for i in range(256):
n = unpack(">I", f.read(4))[0]
if n < nr:
raise Exception("non-monotonic index")
lookup_list.append(n)
nr = n
#sha1 list
sha1_list = []
for i in range(nr):
sha1_list.append(f.read(20).encode("hex"))
#crc list
crc_list = []
for i in range(nr):
crc_list.append(unpack(">I", f.read(4))[0])
#offset list
offset_list = []
for i in range(nr):
offset_list.append(unpack(">I", f.read(4))[0])
#merge list
final_list = []
for i in range(nr):
final_list.append((sha1_list[i], crc_list[i], offset_list[i]))
pack_sha1 = f.read(20)
idx_checksum = f.read(20)
idx_dict.update({
"fanout": lookup_list,
"entries": final_list,
"total": nr,
"packsha1": pack_sha1.encode("hex"),
"idxsha1": idx_checksum.encode("hex")
})
return idx_dict
def extract_git_pack(gitobjects_dir):
#repack all into one
call("git repack -a", shell=True)
tmpfile = str(uuid.uuid4())[:7]
#remove packs to a tmpdir
call("mkdir %s ; mv %s/* %s/" % (tmpfile, os.path.join(gitobjects_dir, "pack"), tmpfile), shell=True)
#unpack them
for i in os.listdir(os.path.join(os.getcwd(), tmpfile)):
if i.endswith(".pack"):
call("git unpack-objects < %s/%s" % (tmpfile, i), shell=True)
#remove packs
call("rm -rf %s" % tmpfile, shell=True)
def read_sha1_tree(path):
ot, olen, content = read_sha1_file(path)
if ot != "tree":
raise Exception("Not a tree object")
tree_entries = []
prev = 0
current = 0
next = 0
while True:
current = content.find("\x20", prev)
if current < 0:
break
mode = int(content[prev:current])
current += 1
next = content.find("\x00", current)
if next < 0:
raise Exception("corrupted tree object file")
name = content[current:next]
current = next+1
sha1 = content[current:current+20]
prev = current + 20
tree_entries.append({
"mode": mode,
"name": name,
"sha1": sha1.encode("hex")
})
return tree_entries
def write_sha1_tree(tree_entries):
if not isinstance(tree_entries, list):
raise Exception("write_sha1_tree: need a list")
tree_buf = StringIO()
for t in tree_entries:
tree_buf.write("%d\x20%s\x00%s" % (t["mode"],
t["name"], t["sha1"].decode("hex")))
tree_content = tree_buf.getvalue()
tree_buf.truncate(0)
tree_buf.write("tree %d\x00%s" % (len(tree_content), tree_content))
final = tree_buf.getvalue()
tree_buf.close()
sha1 = hashlib.sha1(final).hexdigest()
sha1path = "%s/%s" % (sha1[:2], sha1[2:])
loose_object_path = os.path.join(default_gitobjects_dir, sha1path)
f = open(loose_object_path, "w")
f.write(zlib.compress(final))
f.close()
return loose_object_path
def read_sha1_commit(path):
ot, olen, content = read_sha1_file(path)
if ot != "commit":
raise Exception("Not a commit object")
commit_dict = {}
hend = content.rfind("\n\n")
if hend < 0:
raise Exception("corrupted commit object")
header = content[:hend]
content = content[hend+2:]
node = header.split("\n")
treet, sha1 = node[0].split("\x20")
if treet != "tree":
raise Exception("corrupted commit object")
commit_dict["tree"] = sha1
for i in node[1:]:
if i.startswith("parent"):
if "parents" in commit_dict:
commit_dict["parents"].append(i.split("\x20")[1])
else:
commit_dict["parents"] = [i.split("\x20")[1]]
elif i.startswith("author"):
commit_dict["author"] = i[7:]
elif i.startswith("committer"):
commit_dict["committer"] = i[10:]
commit_dict["content"] = content.strip("\n")
commit_dict["time"] = int(commit_dict["author"].split(" ")[-2])
pdb.set_trace()
return commit_dict
def write_sha1_commit(commit_object):
commit_buffer = StringIO()
commit_buffer.write("tree %s\n" % commit_object["tree"])
if "parents" in commit_object:
for p in commit_object["parents"]:
commit_buffer.write("parent %s\n" % p)
commit_buffer.write("author %s %d\n" % (commit_object["author"], commit_object["time"]))
commit_buffer.write("committer %s %d\n" % (commit_object["committer"], commit_object["time"]))
commit_buffer.write("\n%s\n" % commit_object["content"])
commit_content = commit_buffer.getvalue()
commit_buffer.truncate(0)
commit_buffer.write("commit %d\x00%s" % (len(commit_content), commit_content))
final = commit_buffer.getvalue()
commit_buffer.close()
sha1 = hashlib.sha1(final).hexdigest()
sha1path = "%s/%s" % (sha1[:2], sha1[2:])
loose_object_path = os.path.join(default_gitobjects_dir, sha1path)
f = open(loose_object_path, "w")
f.write(zlib.compress(final))
f.close()
return loose_object_path
def sha1_to_path(object_dir, sha1):
if len(object_dir) + 43 > PATH_MAX:
raise Exception("insanely long object directory %s", object_dir)
return "%s/%s/%s" % (object_dir, sha1[:2], sha1[2:])
def sha1_file_exists(object_dir, sha1part):
prefix = "%s/%s" % (object_dir, sha1part[:2])
if not os.path.exists(prefix):
raise Exception((1, "no match"))
rest = sha1part[2:]
matches = 0
found = 0
for i in os.listdir(prefix):
if i.startswith(rest):
matches += 1
found = i
if matches < 1:
raise Exception((1, "match"))
elif matches > 1:
return Exception((2, "multiple matches"))
else:
return "%s/%s" % (prefix, found)
def build_objects_table(object_dir):
os_listdir = os.listdir
objects_table = []
for i in os_listdir(object_dir):
if len(i) != 2:
continue
prefixpath = path_join(object_dir,i)
for j in os_listdir(prefixpath):
sha1 = i+j
if len(sha1) != 40:
raise Exception("Not git object directory")
ot, olen = read_sha1_file(path_join(prefixpath, j), content=False)
objects_table.append({
"sha1": sha1,
"type": ot,
"length": olen
})
return objects_table
#build_objects_table(sys.argv[1])
#print read_sha1_file(sys.argv[1])
#print read_sha1_commit(sys.argv[1])
#print tree_entries
#print read_sha1_file(sys.argv[1], False)
if __name__ == "__main__":
if len(sys.argv) <= 1:
print "usage: python sha1_file.py [option] sha1"
exit(1)
option = sys.argv[1]
if len(sys.argv) > 2:
args2 = sys.argv[2]
if "/" in args2 > 40:
sha1_file = args2
if not os.path.exists(args2):
print "%s doesn't exists" % args2
exit(0)
else:
try:
sha1_file = sha1_file_exists(default_gitobjects_dir, args2)
except:
sha1_file = args2
else:
sha1_file = None
if option == "-g":
if not sha1_file:
print "-g: need the path of a git sha1 file, or it's sha1 if we are in a git repo "
exit(0)
git_object = read_sha1_file(sha1_file)
if git_object[0] == "blob":
print git_object[2]
else:
simple_print(git_object)
elif option == "-t":
if not sha1_file:
print "-t: need the path of a git tree file, or it's sha1 if we are in a git repo "
exit(0)
simple_print(read_sha1_tree(sha1_file))
elif option == "-c":
if not sha1_file:
print "-c: need the path of a git tree file, or sha1 if we are in a git repo "
exit(0)
simple_print(read_sha1_commit(sha1_file))
elif option == "-a":
if not sha1_file: sha1_file = default_gitobjects_dir
for i in build_objects_table(sha1_file):
print i["sha1"], i["type"], i["length"]
elif option == "-i":
if not sha1_file:
print "-i: need the path of a git idx file"
exit(0)
idx_dict = read_idx_file(sha1_file)
print "total", idx_dict["total"]
print "packsha1", idx_dict["packsha1"]
elif option == "--list-commit":
if not sha1_file: sha1_file = default_gitobjects_dir
commit_list = []
for i in build_objects_table(default_gitobjects_dir):
if i["type"] == "commit":
commit_list.append(i)
objects_list = []
for i in commit_list:
commit = read_sha1_commit(
sha1_file_exists(
default_gitobjects_dir,
i["sha1"]))
commit["sha1"] = i["sha1"]
objects_list.append(commit)
for obj in sorted(objects_list,
key=lambda t: t["time"], reverse=True):
print "=========="
print "commit %s" % obj["sha1"]
obj["time"] = time.ctime(obj["time"])
del obj["sha1"]
simple_print(obj)
elif option == "--list-tree":
if not sha1_file: sha1_file = default_gitobjects_dir
for i in build_objects_table(default_gitobjects_dir):
if i["type"] == "tree":
print "=========="
print "tree %s" % i["sha1"]
simple_print(read_sha1_tree(sha1_file_exists(default_gitobjects_dir, i["sha1"])))
elif option == "--list-refs":
if not sha1_file: sha1_file = default_gitrefs_dir
for i in os.listdir(sha1_file):
if i in "heads tags":
heads = os.path.join(sha1_file, i)
for j in os.listdir(heads):
print "========="
if i == "heads":
print "branch: %s" % j
else:
print "tag: %s" % j
f = open(os.path.join(heads, j), "r")
simple_print(read_sha1_commit(sha1_file_exists(default_gitobjects_dir, f.read()[:-1])))
f.close()
if i == "remotes":
remotes = os.path.join(sha1_file, i)
for j in os.listdir(remotes):
remote = os.path.join(remotes, j)
for k in os.listdir(remote):
print "=========="
print "remotes/%s" % k
f = open(os.path.join(remote, k), "r")
sha1 = f.read()[:-1]
if "ref:" not in sha1:
simple_print(read_sha1_commit(sha1_file_exists(default_gitobjects_dir, sha1)))
else:
print sha1
f.close()
if i == "stash":
print "========="
print "stash: %s" % i
f = open(os.path.join(sha1_file, i), "r")
simple_print(read_sha1_commit(sha1_file_exists(default_gitobjects_dir, f.read()[:-1])))
f.close()
elif option == "--extract-all":
if not sha1_file: sha1_file = default_gitobjects_dir
extract_git_pack(sha1_file)
elif option == "--all-version":
if not sha1_file:
raise Exception("which file do you want?")
commit_list = []
for i in build_objects_table(default_gitobjects_dir):
if i["type"] == "commit":
commit_list.append(i)
objects_list = []
for i in commit_list:
commit = (
read_sha1_commit(
sha1_file_exists(
default_gitobjects_dir,
i["sha1"])))
objects_list.append(commit)
sorted_commits = []
for obj in sorted(objects_list,
key=lambda t: t["time"], reverse=True):
obj["time"] = time.ctime(obj["time"])
sorted_commits.append(obj)
matcher = re.compile(sha1_file)
for obj in sorted_commits:
tree = read_sha1_tree(
sha1_file_exists(
default_gitobjects_dir,
obj["tree"]))
for in_tree in tree:
if matcher.match(in_tree["name"]):
print "============="
print "in commit: "
simple_print(obj)
print "%s %s" % (in_tree["sha1"], in_tree["name"])
for i in cache_area.read_from_disk(
os.path.join(default_git_dir, "index")):
if matcher.match(i["name"]):
print "============="
print "in cache: "
print "%s %s" % (i["sha1"], i["name"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment