Created
June 15, 2022 09:08
-
-
Save Ext3h/1c2125ba838b8cb4ac88b1555ba78ba9 to your computer and use it in GitHub Desktop.
PDB index from archive
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Print symstore hash values for PDB amd EXE/DLL files. | |
Usage: | |
python symhash.py [file1 [file2 ...]] | |
This script requires that 'pdbparse' and 'pefile' python packages | |
are installed. To install required packages with pip run: | |
pip install pdbparse pefile | |
""" | |
from fileinput import filename | |
from os import path, makedirs, unlink | |
import pdbparse | |
import pefile | |
import sys | |
import glob | |
import tempfile | |
import mmap | |
from zipfile import ZipFile | |
from tarfile import TarFile | |
from concurrent.futures import ThreadPoolExecutor | |
_symbol_ext = {'pdb'} | |
_binary_ext = {'exe', 'dll', 'adtfplugin', 'adtftool', 'adtffileplugin'} | |
_extract_ext = _symbol_ext.union(_binary_ext) | |
def _pdb_hash(filename): | |
""" | |
Get symstore hash value for a program database (PDB) file. | |
Open and parse required parts of the file to calculate | |
the symstore hash value for the file. | |
""" | |
with open(filename, 'rb') as f: | |
pdb = pdbparse.PDB7(f, fast_load=True) | |
pdb.STREAM_PDB.load() | |
guid = pdb.STREAM_PDB.GUID | |
guid_str = "%.8X%.4X%.4X%s" % (guid.Data1, guid.Data2, guid.Data3, | |
guid.Data4.hex().upper()) | |
return "%s%s" % (guid_str, pdb.STREAM_PDB.Age) | |
def _pe_hash(filename): | |
""" | |
Get symstore hash value for a Portable Executable (PE) file. | |
Open and parse required parts of the file to calculate | |
the symstore hash value for the file. | |
""" | |
with open(filename, 'rb') as f: | |
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: | |
pe = pefile.PE(data = mm, fast_load=True) | |
return "%X%x" % (pe.FILE_HEADER.TimeDateStamp, pe.OPTIONAL_HEADER.SizeOfImage) | |
def _pdb_hash_from_pe(filename): | |
""" | |
Get symstore hash value for a program database (PDB) file, taking | |
it from the Portable Executable (PE) file associated with it. | |
Open and parse required parts of the file to calculate | |
the symstore hash value for the file. | |
""" | |
def is_pdb70_info(s): | |
return isinstance(s, pefile.Structure) and s.name == 'CV_INFO_PDB70' | |
with open(filename, 'rb') as f: | |
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: | |
# NOTE: if we use fast load, it will not parse the DIRECTORY_ENTRY_DEBUG | |
pe = pefile.PE(data = mm, fast_load=False) | |
pdb70_info_entries = [item.entry for item in pe.DIRECTORY_ENTRY_DEBUG | |
if is_pdb70_info(item.entry)] | |
assert pdb70_info_entries, "This PE file does not have CV_INFO_PDB70 data" | |
entry = pdb70_info_entries[0] | |
guid_str = "%.8X%.4X%.4X%s" % ( | |
entry.Signature_Data1, entry.Signature_Data2, | |
entry.Signature_Data3, entry.Signature_Data4.hex().upper() | |
) | |
return "%s%s" % (guid_str, entry.Age) | |
class SymbolLink(object): | |
filename = "" | |
basename = "" | |
sym_hash = "" | |
def __init__(self, filename, sym_hash): | |
self.filename = path.abspath(filename) | |
self.basename = path.basename(filename) | |
self.sym_hash = sym_hash | |
def transform_tar_entry(sym_link, member_name, zip_name): | |
return SymbolLink(zip_name + "!" + member_name, sym_link.sym_hash) | |
def visit_tar(filename, callback, fileobj = None): | |
with tempfile.TemporaryDirectory() as tmp: | |
with TarFile.open(name = filename, fileobj = fileobj) as archive: | |
for member in archive.getmembers(): | |
if(member.isfile() and path.splitext(member.name)[1][1:].lower() in _extract_ext): | |
extracted_name = path.join(tmp, path.basename(member.name)) | |
archive.makefile(member, extracted_name) | |
visit_file(extracted_name, lambda sym_link: callback(transform_tar_entry(sym_link, member.name, filename))) | |
unlink(extracted_name) | |
def transform_zip_entry(sym_link, tmp_dir, zip_name): | |
return SymbolLink(zip_name + "!" + path.relpath(sym_link.filename, tmp_dir), sym_link.sym_hash) | |
def visit_zip(filename, callback, fileobj = None): | |
file = fileobj if fileobj else filename | |
with tempfile.TemporaryDirectory() as tmp: | |
with ZipFile(file) as archive: | |
for member in archive.infolist(): | |
if(not member.is_dir() and path.splitext(member.filename)[1][1:].lower() in _extract_ext): | |
extracted_name = path.normpath(archive.extract(member, tmp)) | |
visit_file(extracted_name, lambda sym_link: callback(transform_zip_entry(sym_link, tmp, filename))) | |
unlink(extracted_name) | |
def visit_file(filename, callback, fileobj = None): | |
""" | |
Print file's type and symstore hash value. | |
""" | |
# we are using filename extension to figure out | |
# file's image type | |
file_ext = path.splitext(filename)[1][1:].lower() | |
try: | |
# get the image specific hash value | |
if file_ext in _symbol_ext: | |
callback(SymbolLink(filename, _pdb_hash(filename))) | |
elif file_ext in _binary_ext: | |
callback(SymbolLink(filename, _pe_hash(filename))) | |
elif file_ext in {"zip"}: | |
visit_zip(filename, callback, fileobj = fileobj) | |
elif file_ext in {"tar", "tar.gz", "tgz"}: | |
visit_tar(filename, callback, fileobj = fileobj) | |
else: | |
pass | |
except Exception as e: | |
print(repr(e)) | |
print("%s: unsupported file" % filename) | |
def create_file_link(sym_link): | |
sym_dir = path.join("index", sym_link.basename, sym_link.sym_hash) | |
if not path.exists(sym_dir): | |
try: | |
makedirs(sym_dir) | |
except OSError as exc: # Guard against race condition | |
if exc.errno != errno.EEXIST: | |
raise | |
f = open(path.join(sym_dir, "file.ptr"), "w") | |
f.write(sym_link.filename) | |
f.close() | |
if __name__ == "__main__": | |
# print hash values for all specified files | |
for fname in sys.argv[1:]: | |
parse = lambda filename: visit_file(filename, lambda sym_link: create_file_link(sym_link)) | |
if(path.isdir(fname)): | |
pattern = path.abspath(fname) + '\**\*.*' | |
with ThreadPoolExecutor() as executor: | |
fnames = glob.glob(pattern, recursive=True) | |
executor.map(parse, fnames) | |
else: | |
parse(fname) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment