-
-
Save valkheim/dcaee7e85d24113ca12564b2ac4881dd to your computer and use it in GitHub Desktop.
Download pdb and PE files from microsoft symbol store
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import logging | |
import argparse | |
import subprocess | |
import requests | |
MICROSOFT_SYMBOL_STORE = "https://msdl.microsoft.com/download/symbols" | |
def try_download_pdb(url, filename, guid, output_filename): | |
pdb_url = "{url:s}/{filename:s}/{guid:s}/{filename:s}".format( | |
url = url, | |
guid = guid, | |
filename = filename | |
) | |
logging.debug("[-] testing url : %s" % pdb_url) | |
response = requests.get(pdb_url, stream=True) | |
if response.status_code != 200: | |
logging.warning("[x] not found pdb at url : %s" % pdb_url) | |
return response.status_code | |
logging.info("[+] found pdb at url : %s" % pdb_url) | |
os.makedirs(os.path.dirname(output_filename), exist_ok = True) | |
with open(output_filename, 'wb') as f: | |
for data in response.iter_content(32*1024): | |
f.write(data) | |
return response.status_code | |
if __name__ == '__main__': | |
logging.getLogger('requests').setLevel(logging.WARNING) | |
logging.getLogger('urllib3').setLevel(logging.WARNING) | |
# logging.getLogger().setLevel(logging.DEBUG) | |
arg_parser = argparse.ArgumentParser("download pe from PDB file.") | |
arg_parser.add_argument("--name", type=str, help="pdb filename") | |
arg_parser.add_argument("--pdb", type=str, help="path to input pdb GUID file") | |
arg_parser.add_argument("--dir", type=str, help="path to root directory") | |
arg_parser.add_argument("-v", "--verbose", action="store_true", help="verbose output : activate debug logging.") | |
args = arg_parser.parse_args() | |
if args.verbose: | |
logging.getLogger().setLevel(logging.DEBUG) | |
else: | |
logging.getLogger().setLevel(logging.INFO) | |
with open(args.pdb, "r") as pdb_guids_fd: | |
pdb_guids = pdb_guids_fd.read() | |
for pdb_guid in pdb_guids.split(): | |
output_filename = os.path.join( | |
args.dir, | |
pdb_guid, | |
args.name | |
) | |
# if os.path.exists(output_filename): | |
# continue | |
try_download_pdb( | |
MICROSOFT_SYMBOL_STORE, | |
args.name, | |
pdb_guid, | |
output_filename | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import argparse | |
import subprocess | |
import logging | |
import time | |
import struct | |
import multiprocessing | |
import shutil | |
import random | |
import requests | |
import yaml | |
import pefile | |
MICROSOFT_SYMBOL_STORE = "https://msdl.microsoft.com/download/symbols" | |
PAGE_SIZE = 4*1024 | |
def LOWORD(dword): | |
return dword & 0x0000ffff | |
def HIWORD(dword): | |
return dword >> 16 | |
def get_product_version(pe): | |
# https://stackoverflow.com/a/16076661/1741450 | |
ms = pe.VS_FIXEDFILEINFO.ProductVersionMS | |
ls = pe.VS_FIXEDFILEINFO.ProductVersionLS | |
# return (HIWORD (ms), LOWORD (ms), HIWORD (ls), LOWORD (ls)) | |
return (ms << 32) + ls | |
def get_guid(dll): | |
# https://gist.github.com/steeve85/2665503 | |
# ugly code, isn't it ? | |
try: | |
# dll = pefile.PE(dll_path) | |
rva = dll.DIRECTORY_ENTRY_DEBUG[0].struct.AddressOfRawData | |
tmp = '' | |
tmp += '%0.*X' % (8, dll.get_dword_at_rva(rva+4)) | |
tmp += '%0.*X' % (4, dll.get_word_at_rva(rva+4+4)) | |
tmp += '%0.*X' % (4, dll.get_word_at_rva(rva+4+4+2)) | |
x = dll.get_word_at_rva(rva+4+4+2+2) | |
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0]) | |
x = dll.get_word_at_rva(rva+4+4+2+2+2) | |
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0]) | |
x = dll.get_word_at_rva(rva+4+4+2+2+2+2) | |
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0]) | |
x = dll.get_word_at_rva(rva+4+4+2+2+2+2+2) | |
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0]) | |
tmp += '%0.*X' % (1, dll.get_word_at_rva(rva+4+4+2+2+2+2+2+2)) | |
except AttributeError as e: | |
# print ('Error appends during %s parsing' % dll_path) | |
print (e) | |
return None | |
return tmp.upper() | |
def extract_timestamp_from_pdb(pdb_file): | |
""" Extract the PdbStream signature (which is the PE "timestamp") from a pdb """ | |
pdbutil_path = os.path.join( | |
os.path.abspath(os.path.dirname(__file__)), | |
"llvm-pdbutil-x64.exe" | |
) | |
command = '{pdbutil:s} pdb2yaml -pdb-stream "{path:s}"'.format( | |
pdbutil = pdbutil_path, | |
path = pdb_file | |
) | |
output = subprocess.check_output(command) | |
pdb_stream = yaml.load(output) | |
return pdb_stream['PdbStream']['Signature'] | |
def extract_image_size_from_pdb(pdb_file): | |
""" Extract the PE sections headers from a pdb file and try to recompute the image size """ | |
cvdump_path = os.path.join( | |
os.path.abspath(os.path.dirname(__file__)), | |
"cvdump.exe" | |
) | |
command = '{cvdump:s} -headers "{path:s}"'.format( | |
cvdump = cvdump_path, | |
path = pdb_file | |
) | |
# dump PE sections in one text block | |
output = subprocess.check_output(command) | |
output = output.decode('ascii') | |
# locate sections headers information | |
idx_section_headers = output.find('*** SECTION HEADERS') | |
idx_orig_section_headers = output.find('*** ORIGINAL SECTION HEADERS') | |
section_headers = output[idx_section_headers:idx_orig_section_headers] | |
sections_headers = re.split(r"SECTION HEADER #", section_headers)[1:] | |
header_regex = re.compile("\r\n".join([ | |
"(\d+)", | |
" *([\.\w]+) name", | |
" *([0-9A-F]+) virtual size", | |
" *([0-9A-F]+) virtual address", | |
" *([0-9A-F]+) size of raw data", | |
" *([0-9A-F]+) file pointer to raw data", | |
" *([0-9A-F]+) file pointer to relocation table", | |
" *([0-9A-F]+) file pointer to line numbers", | |
" *([0-9A-F]+) number of relocations", | |
" *([0-9A-F]+) number of line numbers", | |
" *([0-9A-F]+) flags", | |
]), re.MULTILINE) | |
# Parse every section header and return the max vaddr present | |
max_virtual_address = 0 | |
for hdr in sections_headers: | |
header = header_regex.match(hdr) | |
hid, name, vsize, vaddr, rdsize, rd_fp, rt_fp, ln_fp, reloc_number, ln_number, flags = header.groups() | |
vaddr_end_section = int(vaddr, 16) + int(vsize,16) | |
max_virtual_address = max(max_virtual_address, vaddr_end_section) | |
# align max_virtual_address on a page sixze | |
max_virtual_address = (max_virtual_address + PAGE_SIZE) - (max_virtual_address % PAGE_SIZE) | |
return max_virtual_address | |
def relocate_dll(filepath, filename): | |
pe_obj = pefile.PE(filepath) | |
pe_pdb_GUID = get_guid(pe_obj) | |
pe_obj.close() | |
new_GUID_folder = os.path.join( | |
os.path.dirname(os.path.dirname(filepath)), | |
pe_pdb_GUID, | |
) | |
new_GUID_dll = os.path.join(new_GUID_folder, filename) | |
os.makedirs(new_GUID_folder, exist_ok = True) | |
shutil.move(filepath, new_GUID_dll) | |
def try_download_pe(url, filename, timestamp, image_size, output_filename): | |
pe_url = "{url:s}/{filename:s}/{timestamp:s}{image_size:s}/{filename:s}".format( | |
url = url, | |
filename = filename, | |
timestamp = "%X" % timestamp, | |
image_size = "%X" % image_size, | |
) | |
# print("[-] testing url : %s" % pe_url) | |
for test_try in range(0, 5): | |
try: | |
response = requests.get(pe_url, stream=True) | |
if response.status_code != 200: | |
return response.status_code | |
with open(output_filename, 'wb') as f: | |
for data in response.iter_content(32*1024): | |
f.write(data) | |
return response.status_code | |
except requests.exceptions.ConnectionError as ce: | |
pass | |
def try_download_pe_routine(params): | |
url, filename, timestamp, image_size, output_filename = params | |
try_download_pe_fuzzy_timestamp(url, filename, timestamp, image_size, output_filename) | |
def try_download_pe_fuzzy_timestamp(url, filename, timestamp, image_size, output_filename): | |
# fuzz_range = 15000 | |
fuzz_range = 5000 | |
for test_timestamp in range(timestamp - 10, timestamp+fuzz_range): | |
status_code = try_download_pe(url, filename, test_timestamp, image_size, output_filename) | |
if status_code == 200: | |
pdb_guid = os.path.basename(os.path.dirname(output_filename)) | |
pe_obj = pefile.PE(output_filename) | |
pe_pdb_GUID = get_guid(pe_obj) | |
pe_obj.close() | |
if pdb_guid != pe_pdb_GUID: | |
print("[x] Found matching fuzzy timestamp (%x) but the GUID don't match (%s != %s)" % (test_timestamp, pdb_guid, pe_pdb_GUID)) | |
relocate_dll(output_filename, filename) | |
return False | |
else: | |
print("[!] Found matching fuzzy timestamp %s : %x" % (output_filename, test_timestamp)) | |
return True | |
print("[x] Cound not found a matching timestamp in : [%x, %x] for %s" % (timestamp - 10, timestamp+fuzz_range, output_filename)) | |
return False | |
def bulk_download(root_pdb_folder, pe_filename = None): | |
pdb_found = [] | |
# enumerate all the pdb files without an associated pe | |
for root, folders, files in os.walk(root_pdb_folder): | |
for pdb_filename in filter(lambda pfile: pfile.endswith(".pdb"), files): | |
pdb_filepath = os.path.join(root, pdb_filename) | |
pdb_filename, _ = os.path.splitext(os.path.basename(pdb_filepath)) | |
if not pe_filename: | |
pe_filename = "%s.dll" % pdb_filename | |
pe_filepath = os.path.join( | |
os.path.dirname(pdb_filepath), | |
"%s" % pe_filename | |
) | |
if os.path.exists(pe_filepath): | |
continue | |
try: | |
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath) | |
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath) | |
except subprocess.CalledProcessError as cpe: | |
continue | |
logging.info("[-] Found pdb file : %s (%x %x)" % (pdb_filepath, pdb_timestamp, pdb_size_of_image)) | |
pdb_found.append((MICROSOFT_SYMBOL_STORE, pe_filename, pdb_timestamp, pdb_size_of_image, pe_filepath)) | |
random.shuffle(pdb_found) | |
with multiprocessing.Pool(15) as p: | |
p.map(try_download_pe_routine, pdb_found) | |
def export_summary_infos(root_pdb_folder): | |
# print("Version;FileVersion;TimeDateStamp;Interpreted timestamp;SizeOfImage;PDBSignature;PDB reconstructed SizeOfImage;Timestamp difference;PDB GUID;Filepath") | |
print("| Version | FileVersion | TimeDateStamp | Interpreted timestamp | SizeOfImage | PDBSignature | PDB reconstructed SizeOfImage | Timestamp difference | PDB GUID | ") | |
print("| ------------- | ------------- | ------------- | ----------------------| ------------- | ------------- | ----------------------------- | -------------------- | ------------- | ") | |
for root, folders, files in os.walk(root_pdb_folder): | |
for pe_filename in filter(lambda pfile: pfile.endswith(".dll"), files): | |
pe_filepath = os.path.join(root, pe_filename) | |
pe_name, _ = os.path.splitext(os.path.basename(pe_filepath)) | |
pdb_filename = "%s.pdb" % pe_name | |
pdb_filepath = os.path.join( | |
os.path.dirname(pe_filepath), | |
"%s" % pdb_filename | |
) | |
if os.path.exists(pdb_filepath): | |
pdb_guid = os.path.basename(os.path.dirname(pe_filepath)) | |
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath) | |
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath) | |
pe_obj = pefile.PE(pe_filepath) | |
pe_size_of_image = pe_obj.OPTIONAL_HEADER.SizeOfImage | |
pe_timestamp = pe_obj.FILE_HEADER.TimeDateStamp | |
pe_version = get_product_version(pe_obj) | |
pe_file_version = pe_obj.FileInfo[0].StringTable[0].entries[b'FileVersion'].decode('ascii') | |
pe_obj.close() | |
pe_timestamp_as_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(float(pe_timestamp))) | |
print( | |
# "\"{Version:d}\";\"{FileVersion:s}\";{TimeDateStamp:d};\"{Interpreted_Timestamp:s}\";{SizeOfImage:d};{PDBSignature:d};{PDBSizeOfImage:d};{TimeStampDifference:d};\"{PDB_GUID:s}\";\"{Filepath:s}\"".format( | |
"|{Version:d}|{FileVersion:s}|0x{TimeDateStamp:x}|{Interpreted_Timestamp:s}|0x{SizeOfImage:x}|0x{PDBSignature:x}|0x{PDBSizeOfImage:x}|{TimeStampDifference:d}|{PDB_GUID:s}|".format( | |
Version = pe_version, | |
FileVersion = pe_file_version, | |
TimeDateStamp = pe_timestamp, | |
Interpreted_Timestamp = pe_timestamp_as_time, | |
SizeOfImage = pe_size_of_image, | |
PDBSignature = pdb_timestamp, | |
PDBSizeOfImage = pdb_size_of_image, | |
TimeStampDifference = pe_timestamp - pdb_timestamp, | |
PDB_GUID = pdb_guid, | |
Filepath = pe_filepath | |
)) | |
def verify_downloads(root_pdb_folder): | |
for root, folders, files in os.walk(root_pdb_folder): | |
for pe_filename in filter(lambda pfile: pfile.endswith(".dll"), files): | |
pe_filepath = os.path.join(root, pe_filename) | |
pe_name, _ = os.path.splitext(os.path.basename(pe_filepath)) | |
pdb_filename = "%s.pdb" % pe_name | |
pdb_filepath = os.path.join( | |
os.path.dirname(pe_filepath), | |
"%s" % pdb_filename | |
) | |
if not os.path.exists(pdb_filepath): | |
continue | |
pdb_guid = os.path.basename(os.path.dirname(pe_filepath)) | |
pe_obj = pefile.PE(pe_filepath) | |
pe_pdb_GUID = get_guid(pe_obj) | |
pe_obj.close() | |
status = ("+", "x")[pdb_guid != pe_pdb_GUID] | |
print("[%s] %s (%s == %s)" % (status, pe_filepath, pdb_guid, pe_pdb_GUID)) | |
if pdb_guid != pe_pdb_GUID: | |
relocate_dll(pe_filepath, pe_name) | |
if __name__ == '__main__': | |
logging.getLogger('requests').setLevel(logging.WARNING) | |
logging.getLogger('urllib3').setLevel(logging.WARNING) | |
# logging.getLogger().setLevel(logging.DEBUG) | |
logging.getLogger().setLevel(logging.INFO) | |
arg_parser = argparse.ArgumentParser("download pe from PDB file.") | |
arg_parser.add_argument("--pdb", type=str, help="path to root pdb folder") | |
arg_parser.add_argument("-v", "--verbose", action="store_true", help="verbose output : activate debug logging.") | |
subparsers = arg_parser.add_subparsers(dest='command') | |
download_parser = subparsers.add_parser('DOWNLOAD') | |
download_parser.add_argument("--url", type=str, help="url to symbol store") | |
export_parser = subparsers.add_parser('EXPORT') | |
verify_parser = subparsers.add_parser('VERIFY') | |
args = arg_parser.parse_args() | |
if args.verbose: | |
logging.getLogger().setLevel(logging.DEBUG) | |
else: | |
logging.getLogger().setLevel(logging.INFO) | |
if args.command == 'DOWNLOAD': | |
bulk_download(args.pdb, pe_filename="ntdll.dll") | |
elif args.command == 'EXPORT': | |
export_summary_infos(args.pdb) | |
elif args.command == 'VERIFY': | |
verify_downloads(args.pdb) | |
else: | |
raise ValueError("unsupported command : %s" % args.command) | |
sys.exit(0) | |
bulk_download(r"F:\Dev\pypdb\tests\ntdll.pdb", pe_filename="ntdll.dll") | |
sys.exit(0) | |
pdb_filepath = r"F:\Dev\pypdb\tests\ntdll.pdb\B7803D0EC5D54691BDF74A72B4988B401\ntdll.pdb" | |
pdb_filename, _ = os.path.splitext(os.path.basename(pdb_filepath)) | |
pe_filename = "%s.dll" % pdb_filename | |
pe_filepath = os.path.join( | |
os.path.dirname(pdb_filepath), | |
"%s" % pe_filename | |
) | |
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath) | |
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath) | |
try_download_pe_fuzzy_timestamp(MICROSOFT_SYMBOL_STORE, pe_filename, pdb_timestamp, pdb_size_of_image, pe_filepath) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment