Last active
January 1, 2016 07:29
-
-
Save maliubiao/8112188 to your computer and use it in GitHub Desktop.
git internel , read git sha1 file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os.path | |
| import pdb | |
| import sys | |
| import io | |
| from struct import unpack | |
| from time import ctime | |
| #flags | |
| CE_STAGEMASK = 0x3000 | |
| CE_EXTENDED = 0x4000 | |
| CE_VALID = 0x8000 | |
| CE_STAGESHIFT = 12 | |
| CE_NAMEMASK = 0x0fff | |
| def read_from_disk(path): | |
| if not os.path.exists(path): | |
| raise Exception("path doesn't exists") | |
| entries_list = [] | |
| index = open(path, "r") | |
| signature = index.read(4) | |
| if signature != "DIRC": | |
| raise Exception("path is not a git index file") | |
| #big endian | |
| version = unpack(">I", index.read(4))[0] | |
| entries = unpack(">I", index.read(4))[0] | |
| for i in range(entries): | |
| #entries | |
| ctime = unpack(">I", index.read(4))[0] + (float)(unpack(">I", index.read(4))[0])/1000000 | |
| mtime = unpack(">I", index.read(4))[0] + (float)(unpack(">I", index.read(4))[0])/1000000 | |
| dev = unpack(">I", index.read(4))[0] | |
| ino = unpack(">I", index.read(4))[0] | |
| mode = unpack(">I", index.read(4))[0] | |
| uid = unpack(">I", index.read(4))[0] | |
| gid = unpack(">I", index.read(4))[0] | |
| size = unpack(">I", index.read(4))[0] | |
| #20 byte SHA1 | |
| sha1 = index.read(20); | |
| flags = unpack(">H", index.read(2))[0] | |
| if flags & CE_EXTENDED: | |
| #so far, ignore the second flag | |
| index.read(2) | |
| nlen = flags & CE_NAMEMASK | |
| name = index.read(nlen) | |
| #skip name padding bytes | |
| while True: | |
| #last one, break | |
| if i == entries - 1: | |
| break; | |
| if index.read(1) != "\x00": | |
| index.seek(-1, io.SEEK_CUR) | |
| break | |
| entries_list.append({ | |
| "ctime": ctime, | |
| "mtime": mtime, | |
| "dev": dev, | |
| "ino": ino, | |
| "mode": mode, | |
| "uid": uid, | |
| "gid": gid, | |
| "size": size, | |
| "sha1": sha1.encode("hex"), | |
| "name": name | |
| }) | |
| index.close() | |
| return entries_list |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python | |
| import zlib | |
| import os.path | |
| import sys | |
| import pdb | |
| import hashlib | |
| import io | |
| import re | |
| import uuid | |
| import time | |
| from cStringIO import StringIO | |
| from struct import unpack | |
| from os.path import join as path_join | |
| from subprocess import call | |
| import cache_area | |
| PATH_MAX = 1024 | |
| #object type | |
| OBJ_BAD = -1 | |
| OBJ_NONE = 0 | |
| OBJ_COMMIT = 1 | |
| OBJ_TREE = 2 | |
| OBJ_BLOB = 3 | |
| OBJ_TAG = 4 | |
| OBJ_OFS_DELTA = 6 | |
| OBJ_REF_DELTA = 7 | |
| sha1_file_header = { | |
| OBJ_COMMIT: "commit ", | |
| OBJ_TREE: "tree ", | |
| OBJ_BLOB: "blob ", | |
| OBJ_TAG: "tag " | |
| } | |
| default_gitobjects_dir = ".git/objects" | |
| default_gitrefs_dir = ".git/refs" | |
| default_git_dir = ".git" | |
| def simple_print(obj): | |
| if isinstance(obj, dict): | |
| for k, v in obj.items(): | |
| print "{:<15}: {:<20}".format(k, v) | |
| if isinstance(obj, list): | |
| for i in obj: | |
| for k, v in i.items(): | |
| print "{:<15}: {:<20}".format(k, v) | |
| if isinstance(obj, tuple): | |
| for i in obj: | |
| print i | |
| def read_sha1_file(path, content=True): | |
| if not os.path.exists(path): | |
| raise Exception("path doesn't exists") | |
| f = open(path, "r") | |
| c = zlib.decompress(f.read()) | |
| f.close() | |
| space = c.find(" ") | |
| if space < 0 or space > 10: | |
| raise Exception("corrupted git sha1 file") | |
| length = c.find("\x00", space) | |
| if length < 0: | |
| raise Exception("corrupted git sha1 file") | |
| lenstr = c[space+1:length] | |
| if [x for x in lenstr if ord(x) > (ord('0')+9)]: | |
| raise Exception("corrupted git sha1 file") | |
| #type, length, content | |
| if content: | |
| return c[:space], int(lenstr), c[length+1:] | |
| else: | |
| return c[:space], int(lenstr) | |
| def read_idx_file(path): | |
| if not os.path.exists(path): | |
| raise Exception("path doesn't exists") | |
| idx_dict = {} | |
| f = open(path, "r") | |
| if f.read(4) != "\xff\x74\x4f\x63": | |
| raise Exception("not a idx file") | |
| version = unpack(">I", f.read(4))[0] | |
| #verify The "first level, fan out" table | |
| #this table is used to avoid to | |
| #do eight extra binary search iterations | |
| nr = 0 | |
| lookup_list = [] | |
| for i in range(256): | |
| n = unpack(">I", f.read(4))[0] | |
| if n < nr: | |
| raise Exception("non-monotonic index") | |
| lookup_list.append(n) | |
| nr = n | |
| #sha1 list | |
| sha1_list = [] | |
| for i in range(nr): | |
| sha1_list.append(f.read(20).encode("hex")) | |
| #crc list | |
| crc_list = [] | |
| for i in range(nr): | |
| crc_list.append(unpack(">I", f.read(4))[0]) | |
| #offset list | |
| offset_list = [] | |
| for i in range(nr): | |
| offset_list.append(unpack(">I", f.read(4))[0]) | |
| #merge list | |
| final_list = [] | |
| for i in range(nr): | |
| final_list.append((sha1_list[i], crc_list[i], offset_list[i])) | |
| pack_sha1 = f.read(20) | |
| idx_checksum = f.read(20) | |
| idx_dict.update({ | |
| "fanout": lookup_list, | |
| "entries": final_list, | |
| "total": nr, | |
| "packsha1": pack_sha1.encode("hex"), | |
| "idxsha1": idx_checksum.encode("hex") | |
| }) | |
| return idx_dict | |
| def extract_git_pack(gitobjects_dir): | |
| #repack all into one | |
| call("git repack -a", shell=True) | |
| tmpfile = str(uuid.uuid4())[:7] | |
| #remove packs to a tmpdir | |
| call("mkdir %s ; mv %s/* %s/" % (tmpfile, os.path.join(gitobjects_dir, "pack"), tmpfile), shell=True) | |
| #unpack them | |
| for i in os.listdir(os.path.join(os.getcwd(), tmpfile)): | |
| if i.endswith(".pack"): | |
| call("git unpack-objects < %s/%s" % (tmpfile, i), shell=True) | |
| #remove packs | |
| call("rm -rf %s" % tmpfile, shell=True) | |
| def read_sha1_tree(path): | |
| ot, olen, content = read_sha1_file(path) | |
| if ot != "tree": | |
| raise Exception("Not a tree object") | |
| tree_entries = [] | |
| prev = 0 | |
| current = 0 | |
| next = 0 | |
| while True: | |
| current = content.find("\x20", prev) | |
| if current < 0: | |
| break | |
| mode = int(content[prev:current]) | |
| current += 1 | |
| next = content.find("\x00", current) | |
| if next < 0: | |
| raise Exception("corrupted tree object file") | |
| name = content[current:next] | |
| current = next+1 | |
| sha1 = content[current:current+20] | |
| prev = current + 20 | |
| tree_entries.append({ | |
| "mode": mode, | |
| "name": name, | |
| "sha1": sha1.encode("hex") | |
| }) | |
| return tree_entries | |
| def write_sha1_tree(tree_entries): | |
| if not isinstance(tree_entries, list): | |
| raise Exception("write_sha1_tree: need a list") | |
| tree_buf = StringIO() | |
| for t in tree_entries: | |
| tree_buf.write("%d\x20%s\x00%s" % (t["mode"], | |
| t["name"], t["sha1"].decode("hex"))) | |
| tree_content = tree_buf.getvalue() | |
| tree_buf.truncate(0) | |
| tree_buf.write("tree %d\x00%s" % (len(tree_content), tree_content)) | |
| final = tree_buf.getvalue() | |
| tree_buf.close() | |
| sha1 = hashlib.sha1(final).hexdigest() | |
| sha1path = "%s/%s" % (sha1[:2], sha1[2:]) | |
| loose_object_path = os.path.join(default_gitobjects_dir, sha1path) | |
| f = open(loose_object_path, "w") | |
| f.write(zlib.compress(final)) | |
| f.close() | |
| return loose_object_path | |
| def read_sha1_commit(path): | |
| ot, olen, content = read_sha1_file(path) | |
| if ot != "commit": | |
| raise Exception("Not a commit object") | |
| commit_dict = {} | |
| hend = content.rfind("\n\n") | |
| if hend < 0: | |
| raise Exception("corrupted commit object") | |
| header = content[:hend] | |
| content = content[hend+2:] | |
| node = header.split("\n") | |
| treet, sha1 = node[0].split("\x20") | |
| if treet != "tree": | |
| raise Exception("corrupted commit object") | |
| commit_dict["tree"] = sha1 | |
| for i in node[1:]: | |
| if i.startswith("parent"): | |
| if "parents" in commit_dict: | |
| commit_dict["parents"].append(i.split("\x20")[1]) | |
| else: | |
| commit_dict["parents"] = [i.split("\x20")[1]] | |
| elif i.startswith("author"): | |
| commit_dict["author"] = i[7:] | |
| elif i.startswith("committer"): | |
| commit_dict["committer"] = i[10:] | |
| commit_dict["content"] = content.strip("\n") | |
| commit_dict["time"] = int(commit_dict["author"].split(" ")[-2]) | |
| pdb.set_trace() | |
| return commit_dict | |
| def write_sha1_commit(commit_object): | |
| commit_buffer = StringIO() | |
| commit_buffer.write("tree %s\n" % commit_object["tree"]) | |
| if "parents" in commit_object: | |
| for p in commit_object["parents"]: | |
| commit_buffer.write("parent %s\n" % p) | |
| commit_buffer.write("author %s %d\n" % (commit_object["author"], commit_object["time"])) | |
| commit_buffer.write("committer %s %d\n" % (commit_object["committer"], commit_object["time"])) | |
| commit_buffer.write("\n%s\n" % commit_object["content"]) | |
| commit_content = commit_buffer.getvalue() | |
| commit_buffer.truncate(0) | |
| commit_buffer.write("commit %d\x00%s" % (len(commit_content), commit_content)) | |
| final = commit_buffer.getvalue() | |
| commit_buffer.close() | |
| sha1 = hashlib.sha1(final).hexdigest() | |
| sha1path = "%s/%s" % (sha1[:2], sha1[2:]) | |
| loose_object_path = os.path.join(default_gitobjects_dir, sha1path) | |
| f = open(loose_object_path, "w") | |
| f.write(zlib.compress(final)) | |
| f.close() | |
| return loose_object_path | |
| def sha1_to_path(object_dir, sha1): | |
| if len(object_dir) + 43 > PATH_MAX: | |
| raise Exception("insanely long object directory %s", object_dir) | |
| return "%s/%s/%s" % (object_dir, sha1[:2], sha1[2:]) | |
| def sha1_file_exists(object_dir, sha1part): | |
| prefix = "%s/%s" % (object_dir, sha1part[:2]) | |
| if not os.path.exists(prefix): | |
| raise Exception((1, "no match")) | |
| rest = sha1part[2:] | |
| matches = 0 | |
| found = 0 | |
| for i in os.listdir(prefix): | |
| if i.startswith(rest): | |
| matches += 1 | |
| found = i | |
| if matches < 1: | |
| raise Exception((1, "match")) | |
| elif matches > 1: | |
| return Exception((2, "multiple matches")) | |
| else: | |
| return "%s/%s" % (prefix, found) | |
| def build_objects_table(object_dir): | |
| os_listdir = os.listdir | |
| objects_table = [] | |
| for i in os_listdir(object_dir): | |
| if len(i) != 2: | |
| continue | |
| prefixpath = path_join(object_dir,i) | |
| for j in os_listdir(prefixpath): | |
| sha1 = i+j | |
| if len(sha1) != 40: | |
| raise Exception("Not git object directory") | |
| ot, olen = read_sha1_file(path_join(prefixpath, j), content=False) | |
| objects_table.append({ | |
| "sha1": sha1, | |
| "type": ot, | |
| "length": olen | |
| }) | |
| return objects_table | |
| #build_objects_table(sys.argv[1]) | |
| #print read_sha1_file(sys.argv[1]) | |
| #print read_sha1_commit(sys.argv[1]) | |
| #print tree_entries | |
| #print read_sha1_file(sys.argv[1], False) | |
| if __name__ == "__main__": | |
| if len(sys.argv) <= 1: | |
| print "usage: python sha1_file.py [option] sha1" | |
| exit(1) | |
| option = sys.argv[1] | |
| if len(sys.argv) > 2: | |
| args2 = sys.argv[2] | |
| if "/" in args2 > 40: | |
| sha1_file = args2 | |
| if not os.path.exists(args2): | |
| print "%s doesn't exists" % args2 | |
| exit(0) | |
| else: | |
| try: | |
| sha1_file = sha1_file_exists(default_gitobjects_dir, args2) | |
| except: | |
| sha1_file = args2 | |
| else: | |
| sha1_file = None | |
| if option == "-g": | |
| if not sha1_file: | |
| print "-g: need the path of a git sha1 file, or it's sha1 if we are in a git repo " | |
| exit(0) | |
| git_object = read_sha1_file(sha1_file) | |
| if git_object[0] == "blob": | |
| print git_object[2] | |
| else: | |
| simple_print(git_object) | |
| elif option == "-t": | |
| if not sha1_file: | |
| print "-t: need the path of a git tree file, or it's sha1 if we are in a git repo " | |
| exit(0) | |
| simple_print(read_sha1_tree(sha1_file)) | |
| elif option == "-c": | |
| if not sha1_file: | |
| print "-c: need the path of a git tree file, or sha1 if we are in a git repo " | |
| exit(0) | |
| simple_print(read_sha1_commit(sha1_file)) | |
| elif option == "-a": | |
| if not sha1_file: sha1_file = default_gitobjects_dir | |
| for i in build_objects_table(sha1_file): | |
| print i["sha1"], i["type"], i["length"] | |
| elif option == "-i": | |
| if not sha1_file: | |
| print "-i: need the path of a git idx file" | |
| exit(0) | |
| idx_dict = read_idx_file(sha1_file) | |
| print "total", idx_dict["total"] | |
| print "packsha1", idx_dict["packsha1"] | |
| elif option == "--list-commit": | |
| if not sha1_file: sha1_file = default_gitobjects_dir | |
| commit_list = [] | |
| for i in build_objects_table(default_gitobjects_dir): | |
| if i["type"] == "commit": | |
| commit_list.append(i) | |
| objects_list = [] | |
| for i in commit_list: | |
| commit = read_sha1_commit( | |
| sha1_file_exists( | |
| default_gitobjects_dir, | |
| i["sha1"])) | |
| commit["sha1"] = i["sha1"] | |
| objects_list.append(commit) | |
| for obj in sorted(objects_list, | |
| key=lambda t: t["time"], reverse=True): | |
| print "==========" | |
| print "commit %s" % obj["sha1"] | |
| obj["time"] = time.ctime(obj["time"]) | |
| del obj["sha1"] | |
| simple_print(obj) | |
| elif option == "--list-tree": | |
| if not sha1_file: sha1_file = default_gitobjects_dir | |
| for i in build_objects_table(default_gitobjects_dir): | |
| if i["type"] == "tree": | |
| print "==========" | |
| print "tree %s" % i["sha1"] | |
| simple_print(read_sha1_tree(sha1_file_exists(default_gitobjects_dir, i["sha1"]))) | |
| elif option == "--list-refs": | |
| if not sha1_file: sha1_file = default_gitrefs_dir | |
| for i in os.listdir(sha1_file): | |
| if i in "heads tags": | |
| heads = os.path.join(sha1_file, i) | |
| for j in os.listdir(heads): | |
| print "=========" | |
| if i == "heads": | |
| print "branch: %s" % j | |
| else: | |
| print "tag: %s" % j | |
| f = open(os.path.join(heads, j), "r") | |
| simple_print(read_sha1_commit(sha1_file_exists(default_gitobjects_dir, f.read()[:-1]))) | |
| f.close() | |
| if i == "remotes": | |
| remotes = os.path.join(sha1_file, i) | |
| for j in os.listdir(remotes): | |
| remote = os.path.join(remotes, j) | |
| for k in os.listdir(remote): | |
| print "==========" | |
| print "remotes/%s" % k | |
| f = open(os.path.join(remote, k), "r") | |
| sha1 = f.read()[:-1] | |
| if "ref:" not in sha1: | |
| simple_print(read_sha1_commit(sha1_file_exists(default_gitobjects_dir, sha1))) | |
| else: | |
| print sha1 | |
| f.close() | |
| if i == "stash": | |
| print "=========" | |
| print "stash: %s" % i | |
| f = open(os.path.join(sha1_file, i), "r") | |
| simple_print(read_sha1_commit(sha1_file_exists(default_gitobjects_dir, f.read()[:-1]))) | |
| f.close() | |
| elif option == "--extract-all": | |
| if not sha1_file: sha1_file = default_gitobjects_dir | |
| extract_git_pack(sha1_file) | |
| elif option == "--all-version": | |
| if not sha1_file: | |
| raise Exception("which file do you want?") | |
| commit_list = [] | |
| for i in build_objects_table(default_gitobjects_dir): | |
| if i["type"] == "commit": | |
| commit_list.append(i) | |
| objects_list = [] | |
| for i in commit_list: | |
| commit = ( | |
| read_sha1_commit( | |
| sha1_file_exists( | |
| default_gitobjects_dir, | |
| i["sha1"]))) | |
| objects_list.append(commit) | |
| sorted_commits = [] | |
| for obj in sorted(objects_list, | |
| key=lambda t: t["time"], reverse=True): | |
| obj["time"] = time.ctime(obj["time"]) | |
| sorted_commits.append(obj) | |
| matcher = re.compile(sha1_file) | |
| for obj in sorted_commits: | |
| tree = read_sha1_tree( | |
| sha1_file_exists( | |
| default_gitobjects_dir, | |
| obj["tree"])) | |
| for in_tree in tree: | |
| if matcher.match(in_tree["name"]): | |
| print "=============" | |
| print "in commit: " | |
| simple_print(obj) | |
| print "%s %s" % (in_tree["sha1"], in_tree["name"]) | |
| for i in cache_area.read_from_disk( | |
| os.path.join(default_git_dir, "index")): | |
| if matcher.match(i["name"]): | |
| print "=============" | |
| print "in cache: " | |
| print "%s %s" % (i["sha1"], i["name"]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment