Skip to content

Instantly share code, notes, and snippets.

@valkheim
Forked from lucasg/download_pdb_database.py
Created July 9, 2021 07:01
Download pdb and PE files from microsoft symbol store
import os
import re
import sys
import logging
import argparse
import subprocess
import requests
MICROSOFT_SYMBOL_STORE = "https://msdl.microsoft.com/download/symbols"
def try_download_pdb(url, filename, guid, output_filename):
pdb_url = "{url:s}/{filename:s}/{guid:s}/{filename:s}".format(
url = url,
guid = guid,
filename = filename
)
logging.debug("[-] testing url : %s" % pdb_url)
response = requests.get(pdb_url, stream=True)
if response.status_code != 200:
logging.warning("[x] not found pdb at url : %s" % pdb_url)
return response.status_code
logging.info("[+] found pdb at url : %s" % pdb_url)
os.makedirs(os.path.dirname(output_filename), exist_ok = True)
with open(output_filename, 'wb') as f:
for data in response.iter_content(32*1024):
f.write(data)
return response.status_code
if __name__ == '__main__':
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
# logging.getLogger().setLevel(logging.DEBUG)
arg_parser = argparse.ArgumentParser("download pe from PDB file.")
arg_parser.add_argument("--name", type=str, help="pdb filename")
arg_parser.add_argument("--pdb", type=str, help="path to input pdb GUID file")
arg_parser.add_argument("--dir", type=str, help="path to root directory")
arg_parser.add_argument("-v", "--verbose", action="store_true", help="verbose output : activate debug logging.")
args = arg_parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
with open(args.pdb, "r") as pdb_guids_fd:
pdb_guids = pdb_guids_fd.read()
for pdb_guid in pdb_guids.split():
output_filename = os.path.join(
args.dir,
pdb_guid,
args.name
)
# if os.path.exists(output_filename):
# continue
try_download_pdb(
MICROSOFT_SYMBOL_STORE,
args.name,
pdb_guid,
output_filename
)
import os
import re
import sys
import argparse
import subprocess
import logging
import time
import struct
import multiprocessing
import shutil
import random
import requests
import yaml
import pefile
MICROSOFT_SYMBOL_STORE = "https://msdl.microsoft.com/download/symbols"
PAGE_SIZE = 4*1024
def LOWORD(dword):
return dword & 0x0000ffff
def HIWORD(dword):
return dword >> 16
def get_product_version(pe):
# https://stackoverflow.com/a/16076661/1741450
ms = pe.VS_FIXEDFILEINFO.ProductVersionMS
ls = pe.VS_FIXEDFILEINFO.ProductVersionLS
# return (HIWORD (ms), LOWORD (ms), HIWORD (ls), LOWORD (ls))
return (ms << 32) + ls
def get_guid(dll):
# https://gist.github.com/steeve85/2665503
# ugly code, isn't it ?
try:
# dll = pefile.PE(dll_path)
rva = dll.DIRECTORY_ENTRY_DEBUG[0].struct.AddressOfRawData
tmp = ''
tmp += '%0.*X' % (8, dll.get_dword_at_rva(rva+4))
tmp += '%0.*X' % (4, dll.get_word_at_rva(rva+4+4))
tmp += '%0.*X' % (4, dll.get_word_at_rva(rva+4+4+2))
x = dll.get_word_at_rva(rva+4+4+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
x = dll.get_word_at_rva(rva+4+4+2+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
x = dll.get_word_at_rva(rva+4+4+2+2+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
x = dll.get_word_at_rva(rva+4+4+2+2+2+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
tmp += '%0.*X' % (1, dll.get_word_at_rva(rva+4+4+2+2+2+2+2+2))
except AttributeError as e:
# print ('Error appends during %s parsing' % dll_path)
print (e)
return None
return tmp.upper()
def extract_timestamp_from_pdb(pdb_file):
""" Extract the PdbStream signature (which is the PE "timestamp") from a pdb """
pdbutil_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"llvm-pdbutil-x64.exe"
)
command = '{pdbutil:s} pdb2yaml -pdb-stream "{path:s}"'.format(
pdbutil = pdbutil_path,
path = pdb_file
)
output = subprocess.check_output(command)
pdb_stream = yaml.load(output)
return pdb_stream['PdbStream']['Signature']
def extract_image_size_from_pdb(pdb_file):
""" Extract the PE sections headers from a pdb file and try to recompute the image size """
cvdump_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"cvdump.exe"
)
command = '{cvdump:s} -headers "{path:s}"'.format(
cvdump = cvdump_path,
path = pdb_file
)
# dump PE sections in one text block
output = subprocess.check_output(command)
output = output.decode('ascii')
# locate sections headers information
idx_section_headers = output.find('*** SECTION HEADERS')
idx_orig_section_headers = output.find('*** ORIGINAL SECTION HEADERS')
section_headers = output[idx_section_headers:idx_orig_section_headers]
sections_headers = re.split(r"SECTION HEADER #", section_headers)[1:]
header_regex = re.compile("\r\n".join([
"(\d+)",
" *([\.\w]+) name",
" *([0-9A-F]+) virtual size",
" *([0-9A-F]+) virtual address",
" *([0-9A-F]+) size of raw data",
" *([0-9A-F]+) file pointer to raw data",
" *([0-9A-F]+) file pointer to relocation table",
" *([0-9A-F]+) file pointer to line numbers",
" *([0-9A-F]+) number of relocations",
" *([0-9A-F]+) number of line numbers",
" *([0-9A-F]+) flags",
]), re.MULTILINE)
# Parse every section header and return the max vaddr present
max_virtual_address = 0
for hdr in sections_headers:
header = header_regex.match(hdr)
hid, name, vsize, vaddr, rdsize, rd_fp, rt_fp, ln_fp, reloc_number, ln_number, flags = header.groups()
vaddr_end_section = int(vaddr, 16) + int(vsize,16)
max_virtual_address = max(max_virtual_address, vaddr_end_section)
# align max_virtual_address on a page sixze
max_virtual_address = (max_virtual_address + PAGE_SIZE) - (max_virtual_address % PAGE_SIZE)
return max_virtual_address
def relocate_dll(filepath, filename):
pe_obj = pefile.PE(filepath)
pe_pdb_GUID = get_guid(pe_obj)
pe_obj.close()
new_GUID_folder = os.path.join(
os.path.dirname(os.path.dirname(filepath)),
pe_pdb_GUID,
)
new_GUID_dll = os.path.join(new_GUID_folder, filename)
os.makedirs(new_GUID_folder, exist_ok = True)
shutil.move(filepath, new_GUID_dll)
def try_download_pe(url, filename, timestamp, image_size, output_filename):
pe_url = "{url:s}/{filename:s}/{timestamp:s}{image_size:s}/{filename:s}".format(
url = url,
filename = filename,
timestamp = "%X" % timestamp,
image_size = "%X" % image_size,
)
# print("[-] testing url : %s" % pe_url)
for test_try in range(0, 5):
try:
response = requests.get(pe_url, stream=True)
if response.status_code != 200:
return response.status_code
with open(output_filename, 'wb') as f:
for data in response.iter_content(32*1024):
f.write(data)
return response.status_code
except requests.exceptions.ConnectionError as ce:
pass
def try_download_pe_routine(params):
url, filename, timestamp, image_size, output_filename = params
try_download_pe_fuzzy_timestamp(url, filename, timestamp, image_size, output_filename)
def try_download_pe_fuzzy_timestamp(url, filename, timestamp, image_size, output_filename):
# fuzz_range = 15000
fuzz_range = 5000
for test_timestamp in range(timestamp - 10, timestamp+fuzz_range):
status_code = try_download_pe(url, filename, test_timestamp, image_size, output_filename)
if status_code == 200:
pdb_guid = os.path.basename(os.path.dirname(output_filename))
pe_obj = pefile.PE(output_filename)
pe_pdb_GUID = get_guid(pe_obj)
pe_obj.close()
if pdb_guid != pe_pdb_GUID:
print("[x] Found matching fuzzy timestamp (%x) but the GUID don't match (%s != %s)" % (test_timestamp, pdb_guid, pe_pdb_GUID))
relocate_dll(output_filename, filename)
return False
else:
print("[!] Found matching fuzzy timestamp %s : %x" % (output_filename, test_timestamp))
return True
print("[x] Cound not found a matching timestamp in : [%x, %x] for %s" % (timestamp - 10, timestamp+fuzz_range, output_filename))
return False
def bulk_download(root_pdb_folder, pe_filename = None):
pdb_found = []
# enumerate all the pdb files without an associated pe
for root, folders, files in os.walk(root_pdb_folder):
for pdb_filename in filter(lambda pfile: pfile.endswith(".pdb"), files):
pdb_filepath = os.path.join(root, pdb_filename)
pdb_filename, _ = os.path.splitext(os.path.basename(pdb_filepath))
if not pe_filename:
pe_filename = "%s.dll" % pdb_filename
pe_filepath = os.path.join(
os.path.dirname(pdb_filepath),
"%s" % pe_filename
)
if os.path.exists(pe_filepath):
continue
try:
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath)
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath)
except subprocess.CalledProcessError as cpe:
continue
logging.info("[-] Found pdb file : %s (%x %x)" % (pdb_filepath, pdb_timestamp, pdb_size_of_image))
pdb_found.append((MICROSOFT_SYMBOL_STORE, pe_filename, pdb_timestamp, pdb_size_of_image, pe_filepath))
random.shuffle(pdb_found)
with multiprocessing.Pool(15) as p:
p.map(try_download_pe_routine, pdb_found)
def export_summary_infos(root_pdb_folder):
# print("Version;FileVersion;TimeDateStamp;Interpreted timestamp;SizeOfImage;PDBSignature;PDB reconstructed SizeOfImage;Timestamp difference;PDB GUID;Filepath")
print("| Version | FileVersion | TimeDateStamp | Interpreted timestamp | SizeOfImage | PDBSignature | PDB reconstructed SizeOfImage | Timestamp difference | PDB GUID | ")
print("| ------------- | ------------- | ------------- | ----------------------| ------------- | ------------- | ----------------------------- | -------------------- | ------------- | ")
for root, folders, files in os.walk(root_pdb_folder):
for pe_filename in filter(lambda pfile: pfile.endswith(".dll"), files):
pe_filepath = os.path.join(root, pe_filename)
pe_name, _ = os.path.splitext(os.path.basename(pe_filepath))
pdb_filename = "%s.pdb" % pe_name
pdb_filepath = os.path.join(
os.path.dirname(pe_filepath),
"%s" % pdb_filename
)
if os.path.exists(pdb_filepath):
pdb_guid = os.path.basename(os.path.dirname(pe_filepath))
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath)
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath)
pe_obj = pefile.PE(pe_filepath)
pe_size_of_image = pe_obj.OPTIONAL_HEADER.SizeOfImage
pe_timestamp = pe_obj.FILE_HEADER.TimeDateStamp
pe_version = get_product_version(pe_obj)
pe_file_version = pe_obj.FileInfo[0].StringTable[0].entries[b'FileVersion'].decode('ascii')
pe_obj.close()
pe_timestamp_as_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(float(pe_timestamp)))
print(
# "\"{Version:d}\";\"{FileVersion:s}\";{TimeDateStamp:d};\"{Interpreted_Timestamp:s}\";{SizeOfImage:d};{PDBSignature:d};{PDBSizeOfImage:d};{TimeStampDifference:d};\"{PDB_GUID:s}\";\"{Filepath:s}\"".format(
"|{Version:d}|{FileVersion:s}|0x{TimeDateStamp:x}|{Interpreted_Timestamp:s}|0x{SizeOfImage:x}|0x{PDBSignature:x}|0x{PDBSizeOfImage:x}|{TimeStampDifference:d}|{PDB_GUID:s}|".format(
Version = pe_version,
FileVersion = pe_file_version,
TimeDateStamp = pe_timestamp,
Interpreted_Timestamp = pe_timestamp_as_time,
SizeOfImage = pe_size_of_image,
PDBSignature = pdb_timestamp,
PDBSizeOfImage = pdb_size_of_image,
TimeStampDifference = pe_timestamp - pdb_timestamp,
PDB_GUID = pdb_guid,
Filepath = pe_filepath
))
def verify_downloads(root_pdb_folder):
for root, folders, files in os.walk(root_pdb_folder):
for pe_filename in filter(lambda pfile: pfile.endswith(".dll"), files):
pe_filepath = os.path.join(root, pe_filename)
pe_name, _ = os.path.splitext(os.path.basename(pe_filepath))
pdb_filename = "%s.pdb" % pe_name
pdb_filepath = os.path.join(
os.path.dirname(pe_filepath),
"%s" % pdb_filename
)
if not os.path.exists(pdb_filepath):
continue
pdb_guid = os.path.basename(os.path.dirname(pe_filepath))
pe_obj = pefile.PE(pe_filepath)
pe_pdb_GUID = get_guid(pe_obj)
pe_obj.close()
status = ("+", "x")[pdb_guid != pe_pdb_GUID]
print("[%s] %s (%s == %s)" % (status, pe_filepath, pdb_guid, pe_pdb_GUID))
if pdb_guid != pe_pdb_GUID:
relocate_dll(pe_filepath, pe_name)
if __name__ == '__main__':
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
# logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
arg_parser = argparse.ArgumentParser("download pe from PDB file.")
arg_parser.add_argument("--pdb", type=str, help="path to root pdb folder")
arg_parser.add_argument("-v", "--verbose", action="store_true", help="verbose output : activate debug logging.")
subparsers = arg_parser.add_subparsers(dest='command')
download_parser = subparsers.add_parser('DOWNLOAD')
download_parser.add_argument("--url", type=str, help="url to symbol store")
export_parser = subparsers.add_parser('EXPORT')
verify_parser = subparsers.add_parser('VERIFY')
args = arg_parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
if args.command == 'DOWNLOAD':
bulk_download(args.pdb, pe_filename="ntdll.dll")
elif args.command == 'EXPORT':
export_summary_infos(args.pdb)
elif args.command == 'VERIFY':
verify_downloads(args.pdb)
else:
raise ValueError("unsupported command : %s" % args.command)
sys.exit(0)
bulk_download(r"F:\Dev\pypdb\tests\ntdll.pdb", pe_filename="ntdll.dll")
sys.exit(0)
pdb_filepath = r"F:\Dev\pypdb\tests\ntdll.pdb\B7803D0EC5D54691BDF74A72B4988B401\ntdll.pdb"
pdb_filename, _ = os.path.splitext(os.path.basename(pdb_filepath))
pe_filename = "%s.dll" % pdb_filename
pe_filepath = os.path.join(
os.path.dirname(pdb_filepath),
"%s" % pe_filename
)
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath)
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath)
try_download_pe_fuzzy_timestamp(MICROSOFT_SYMBOL_STORE, pe_filename, pdb_timestamp, pdb_size_of_image, pe_filepath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment