Last active
August 13, 2025 07:41
-
-
Save UserUnknownFactor/465182a34594090246bda1f2e4a4724f to your computer and use it in GitHub Desktop.
Full Python port of NTFS File Information Utility
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
NTFS File Information Utility | |
Dumps information about an NTFS volume, and optionally determines | |
which volume and file contains a particular sector and vice versa. | |
""" | |
import ctypes | |
from ctypes import wintypes | |
import os | |
import sys | |
import struct | |
from enum import IntEnum, Enum | |
from dataclasses import dataclass | |
from typing import Optional, Tuple, List, Dict, Union | |
from pathlib import Path | |
import logging | |
logging.basicConfig(level=logging.INFO, format='%(message)s') | |
logger = logging.getLogger(__name__) | |
# Type definitions | |
WCHAR = wintypes.WCHAR | |
PVOID = ctypes.c_void_p | |
HANDLE = wintypes.HANDLE | |
LONG = wintypes.LONG | |
ULONG = wintypes.ULONG | |
if ctypes.sizeof(ctypes.c_void_p) == 8: | |
ULONG_PTR = ctypes.c_uint64 | |
else: | |
ULONG_PTR = ctypes.c_uint32 | |
POINTER = ctypes.POINTER | |
DWORD = wintypes.DWORD | |
LPCWSTR = wintypes.LPCWSTR | |
LPWSTR = wintypes.LPWSTR | |
BOOL = wintypes.BOOL | |
WORD = wintypes.WORD | |
BYTE = wintypes.BYTE | |
LARGE_INTEGER = ctypes.c_int64 | |
ULARGE_INTEGER = ctypes.c_uint64 | |
HMODULE = wintypes.HMODULE | |
LPCVOID = wintypes.LPCVOID | |
LPCSTR = wintypes.LPCSTR | |
FARPROC = wintypes.LPCVOID | |
class WindowsConstants(IntEnum): | |
"""Windows API constants""" | |
GENERIC_READ = 0x80000000 | |
FILE_SHARE_READ = 0x00000001 | |
FILE_SHARE_WRITE = 0x00000002 | |
OPEN_EXISTING = 0x00000003 | |
FILE_ATTRIBUTE_NORMAL = 0x00000080 | |
FILE_FLAG_NO_BUFFERING = 0x20000000 | |
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 | |
FILE_FLAG_WRITE_THROUGH = 0x80000000 | |
INVALID_HANDLE_VALUE = -1 | |
# File Information Classes | |
FILE_INTERNAL_INFORMATION = 6 | |
FILE_NAME_INFORMATION = 9 | |
# Object Attributes | |
OBJ_CASE_INSENSITIVE = 0x00000040 | |
# Access Masks | |
FILE_READ_ATTRIBUTES = 0x0080 | |
SYNCHRONIZE = 0x00100000 | |
FILE_LIST_DIRECTORY = 0x0001 | |
FILE_SYNCHRONOUS_IO_NONALERT = 0x00000020 | |
class NTFSConstants(IntEnum): | |
"""NTFS-specific constants""" | |
# FSCTL codes | |
FSCTL_GET_NTFS_VOLUME_DATA = 0x00090064 | |
FSCTL_GET_NTFS_FILE_RECORD = 0x00090068 | |
FSCTL_QUERY_ALLOCATED_RANGES = 0x0009006F | |
# IOCTL codes | |
IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS = 0x00560000 | |
IOCTL_VOLUME_LOGICAL_TO_PHYSICAL = 0x00560024 | |
IOCTL_DISK_GET_DRIVE_GEOMETRY = 0x00070000 | |
IOCTL_DISK_GET_PARTITION_INFO = 0x00074004 | |
IOCTL_STORAGE_GET_DEVICE_NUMBER = 0x002D1080 | |
# NT Status codes | |
STATUS_SUCCESS = 0x00000000 | |
STATUS_BUFFER_OVERFLOW = 0x80000005 | |
class NTFSAttributeType(IntEnum): | |
"""NTFS attribute types""" | |
UNUSED = 0x00 | |
STANDARD_INFORMATION = 0x10 | |
ATTRIBUTE_LIST = 0x20 | |
FILE_NAME = 0x30 | |
OBJECT_ID = 0x40 | |
SECURITY_DESCRIPTOR = 0x50 | |
VOLUME_NAME = 0x60 | |
VOLUME_INFORMATION = 0x70 | |
DATA = 0x80 | |
INDEX_ROOT = 0x90 | |
INDEX_ALLOCATION = 0xA0 | |
BITMAP = 0xB0 | |
REPARSE_POINT = 0xC0 | |
EA_INFORMATION = 0xD0 | |
EA = 0xE0 | |
def get_name(self) -> str: | |
"""Get the display name for this attribute type""" | |
mv = sys.getwindowsversion().major | |
names = { | |
self.UNUSED: "$UNUSED", | |
self.STANDARD_INFORMATION: "$STANDARD_INFORMATION", | |
self.ATTRIBUTE_LIST: "$ATTRIBUTE_LIST", | |
self.FILE_NAME: "$FILE_NAME", | |
self.OBJECT_ID: "$OBJECT_ID", | |
self.SECURITY_DESCRIPTOR: "$SECURITY_DESCRIPTOR", | |
self.VOLUME_NAME: "$VOLUME_NAME", | |
self.VOLUME_INFORMATION: "$VOLUME_INFORMATION", | |
self.DATA: "$DATA", | |
self.INDEX_ROOT: "$INDEX_ROOT", | |
self.INDEX_ALLOCATION: "$INDEX_ALLOCATION", | |
self.BITMAP: "$BITMAP", | |
self.REPARSE_POINT: "$REPARSE_POINT" if mv > 4 else "$SYMBOLIC_LINK", | |
self.EA_INFORMATION: "$EA_INFORMATION", | |
self.EA: "$EA" | |
} | |
return names.get(self, f"${self.value:04X}") | |
class SystemFileIndex(IntEnum): | |
"""NTFS system file indices""" | |
MFT = 0 | |
MFT_MIRR = 1 | |
LOG_FILE = 2 | |
VOLUME = 3 | |
ATTR_DEF = 4 | |
ROOT = 5 | |
BITMAP = 6 | |
BOOT = 7 | |
BAD_CLUS = 8 | |
SECURE = 9 | |
UPCASE = 10 | |
EXTENDED = 11 | |
RESERVED_12 = 12 | |
RESERVED_13 = 13 | |
RESERVED_14 = 14 | |
RESERVED_15 = 15 | |
def get_name(self) -> str: | |
"""Get the display name for this system file""" | |
mv = sys.getwindowsversion().major | |
names = { | |
self.MFT: "$MFT", | |
self.MFT_MIRR: "$MFTMirr", | |
self.LOG_FILE: "$LogFile", | |
self.VOLUME: "$Volume", | |
self.ATTR_DEF: "$AttrDef", | |
self.ROOT: "$Root", | |
self.BITMAP: "$Bitmap", | |
self.BOOT: "$Boot", | |
self.BAD_CLUS: "$BadClus", | |
self.SECURE: "$Secure" if mv > 4 else "$Quota", | |
self.UPCASE: "$UpCase", | |
self.EXTENDED: "$Extended", | |
self.RESERVED_12: "$Reserved12", | |
self.RESERVED_13: "$Reserved13", | |
self.RESERVED_14: "$Reserved14", | |
self.RESERVED_15: "$Reserved15" | |
} | |
return names.get(self, f"$SystemFile{self.value}") | |
@dataclass | |
class VolumeInfo: | |
"""Information about an NTFS volume""" | |
handle: HANDLE | |
drive_letter: str | |
bytes_per_sector: int | |
sectors_per_cluster: int | |
bytes_per_cluster: int | |
bytes_per_file_record: int | |
mft_start_lcn: int | |
mft_total_records: int | |
buffer: ctypes.Array | |
buffer_size: int | |
@dataclass | |
class PhysicalToLogicalMapping: | |
"""Maps a physical sector to a logical sector on a volume""" | |
drive_letter: str | |
logical_sector: int | |
physical_sector: int | |
class NTFSError(Exception): | |
"""Base exception for NTFS operations""" | |
pass | |
class VolumeOpenError(NTFSError): | |
"""Error opening volume""" | |
pass | |
class FileRecordError(NTFSError): | |
"""Error reading file record""" | |
pass | |
# Windows API structures | |
class IO_STATUS_BLOCK(ctypes.Structure): | |
_fields_ = [ | |
('Status', LONG), | |
('Information', ULONG_PTR), | |
] | |
class UNICODE_STRING(ctypes.Structure): | |
_fields_ = [ | |
('Length', WORD), | |
('MaximumLength', WORD), | |
('Buffer', LPWSTR), | |
] | |
class OBJECT_ATTRIBUTES(ctypes.Structure): | |
_fields_ = [ | |
('Length', ULONG), | |
('RootDirectory', HANDLE), | |
('ObjectName', PVOID), | |
('Attributes', ULONG), | |
('SecurityDescriptor', PVOID), | |
('SecurityQualityOfService', PVOID), | |
] | |
class DISK_GEOMETRY(ctypes.Structure): | |
_fields_ = [ | |
("Cylinders", LARGE_INTEGER), | |
("MediaType", DWORD), | |
("TracksPerCylinder", DWORD), | |
("SectorsPerTrack", DWORD), | |
("BytesPerSector", DWORD), | |
] | |
class PARTITION_INFORMATION(ctypes.Structure): | |
_fields_ = [ | |
("StartingOffset", LARGE_INTEGER), | |
("PartitionLength", LARGE_INTEGER), | |
("HiddenSectors", DWORD), | |
("PartitionNumber", DWORD), | |
("PartitionType", BYTE), | |
("BootIndicator", BOOL), | |
("RecognizedPartition", BOOL), | |
("RewritePartition", BOOL), | |
] | |
class VOLUME_DISK_EXTENT(ctypes.Structure): | |
_fields_ = [ | |
("DiskNumber", DWORD), | |
("StartingOffset", LARGE_INTEGER), | |
("ExtentLength", LARGE_INTEGER), | |
] | |
class VOLUME_DISK_EXTENTS(ctypes.Structure): | |
_fields_ = [ | |
("NumberOfDiskExtents", DWORD), | |
("Extents", VOLUME_DISK_EXTENT * 1), | |
] | |
class NTFS_VOLUME_DATA_BUFFER(ctypes.Structure): | |
_fields_ = [ | |
("VolumeSerialNumber", LARGE_INTEGER), | |
("NumberSectors", LARGE_INTEGER), | |
("TotalClusters", LARGE_INTEGER), | |
("FreeClusters", LARGE_INTEGER), | |
("TotalReserved", LARGE_INTEGER), | |
("BytesPerSector", DWORD), | |
("BytesPerCluster", DWORD), | |
("BytesPerFileRecordSegment", DWORD), | |
("ClustersPerFileRecordSegment", DWORD), | |
("MftValidDataLength", LARGE_INTEGER), | |
("MftStartLcn", LARGE_INTEGER), | |
("Mft2StartLcn", LARGE_INTEGER), | |
("MftZoneStart", LARGE_INTEGER), | |
("MftZoneEnd", LARGE_INTEGER), | |
] | |
class NTFS_FILE_RECORD_INPUT_BUFFER(ctypes.Structure): | |
_fields_ = [ | |
("FileReferenceNumber", LARGE_INTEGER), | |
] | |
class FILE_INTERNAL_INFORMATION(ctypes.Structure): | |
_fields_ = [ | |
("IndexNumber", LARGE_INTEGER), | |
] | |
class FILE_ALLOCATED_RANGE_BUFFER(ctypes.Structure): | |
_fields_ = [ | |
("FileOffset", LARGE_INTEGER), | |
("Length", LARGE_INTEGER), | |
] | |
class WindowsAPI: | |
"""Wrapper for Windows API calls""" | |
def __init__(self): | |
self.kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) | |
self.ntdll = ctypes.WinDLL('ntdll', use_last_error=True) | |
self.user32 = ctypes.WinDLL('user32', use_last_error=True) | |
self._setup_prototypes() | |
self._check_volume_functions() | |
def _setup_prototypes(self): | |
"""Set up function prototypes""" | |
# Kernel32 functions | |
self.kernel32.CreateFileW.argtypes = [ | |
LPCWSTR, DWORD, DWORD, PVOID, DWORD, DWORD, HANDLE] | |
self.kernel32.CreateFileW.restype = HANDLE | |
self.kernel32.CloseHandle.argtypes = [HANDLE] | |
self.kernel32.CloseHandle.restype = BOOL | |
self.kernel32.DeviceIoControl.argtypes = [ | |
HANDLE, DWORD, PVOID, DWORD, PVOID, DWORD, POINTER(DWORD), PVOID] | |
self.kernel32.DeviceIoControl.restype = BOOL | |
self.kernel32.GetVolumeInformationW.argtypes = [ | |
LPCWSTR, LPWSTR, DWORD, POINTER(DWORD), | |
POINTER(DWORD), POINTER(DWORD), LPWSTR, DWORD] | |
self.kernel32.GetVolumeInformationW.restype = BOOL | |
self.kernel32.GetLastError.argtypes = [] | |
self.kernel32.GetLastError.restype = DWORD | |
self.kernel32.QueryDosDeviceW.argtypes = [LPCWSTR, LPWSTR, DWORD] | |
self.kernel32.QueryDosDeviceW.restype = DWORD | |
self.kernel32.GetVersion.argtypes = [] | |
self.kernel32.GetVersion.restype = DWORD | |
# NT functions | |
self.ntdll.NtOpenFile.argtypes = [ | |
POINTER(HANDLE), DWORD, POINTER(OBJECT_ATTRIBUTES), | |
POINTER(IO_STATUS_BLOCK), DWORD, DWORD] | |
self.ntdll.NtOpenFile.restype = ULONG | |
self.ntdll.NtQueryInformationFile.argtypes = [ | |
HANDLE, POINTER(IO_STATUS_BLOCK), PVOID, ULONG, DWORD] | |
self.ntdll.NtQueryInformationFile.restype = ULONG | |
self.ntdll.NtFsControlFile.argtypes = [ | |
HANDLE, HANDLE, PVOID, PVOID, POINTER(IO_STATUS_BLOCK), | |
ULONG, PVOID, ULONG, PVOID, ULONG] | |
self.ntdll.NtFsControlFile.restype = ULONG | |
self.ntdll.NtDeviceIoControlFile.argtypes = [ | |
HANDLE, HANDLE, PVOID, PVOID, POINTER(IO_STATUS_BLOCK), | |
ULONG, PVOID, ULONG, PVOID, ULONG] | |
self.ntdll.NtDeviceIoControlFile.restype = ULONG | |
self.ntdll.NtClose.argtypes = [HANDLE] | |
self.ntdll.NtClose.restype = ULONG | |
self.ntdll.RtlInitUnicodeString.argtypes = [ | |
POINTER(UNICODE_STRING), LPCWSTR] | |
self.ntdll.RtlInitUnicodeString.restype = None | |
# User32 functions | |
self.user32.CharUpperW.argtypes = [LPWSTR] | |
self.user32.CharUpperW.restype = LPWSTR | |
def _check_volume_functions(self): | |
"""Check if newer volume management functions are available""" | |
self.volume_functions_available = False | |
try: | |
self.kernel32.FindFirstVolumeW.argtypes = [LPWSTR, DWORD] | |
self.kernel32.FindFirstVolumeW.restype = HANDLE | |
self.kernel32.FindNextVolumeW.argtypes = [HANDLE, LPWSTR, DWORD] | |
self.kernel32.FindNextVolumeW.restype = BOOL | |
self.kernel32.FindVolumeClose.argtypes = [HANDLE] | |
self.kernel32.FindVolumeClose.restype = BOOL | |
self.kernel32.GetVolumePathNamesForVolumeNameW.argtypes = [ | |
LPCWSTR, LPWSTR, DWORD, POINTER(DWORD)] | |
self.kernel32.GetVolumePathNamesForVolumeNameW.restype = BOOL | |
self.volume_functions_available = True | |
except: | |
pass | |
class PhysicalDiskAnalyzer: | |
"""Analyzes physical disk sectors and maps them to volumes""" | |
def __init__(self, api: WindowsAPI): | |
self.api = api | |
def find_volume_containing_sector(self, device_path: str, physical_sector: int) -> Optional[PhysicalToLogicalMapping]: | |
"""Find which volume contains a physical sector""" | |
version = self.api.kernel32.GetVersion() & 0xFF | |
if version <= 4 or not self.api.volume_functions_available: | |
return self._find_volume_legacy(device_path, physical_sector) | |
else: | |
return self._find_volume_modern(device_path, physical_sector) | |
def _get_device_parameters(self, device_path: str) -> Tuple[bool, int, int]: | |
"""Get device parameters for a physical device""" | |
ustr = UNICODE_STRING() | |
obj_attr = OBJECT_ATTRIBUTES() | |
io_status = IO_STATUS_BLOCK() | |
handle = HANDLE() | |
self.api.ntdll.RtlInitUnicodeString(ctypes.byref(ustr), device_path) | |
obj_attr.Length = ctypes.sizeof(OBJECT_ATTRIBUTES) | |
obj_attr.RootDirectory = None | |
obj_attr.ObjectName = ctypes.cast(ctypes.byref(ustr), PVOID) | |
obj_attr.Attributes = WindowsConstants.OBJ_CASE_INSENSITIVE | |
obj_attr.SecurityDescriptor = None | |
obj_attr.SecurityQualityOfService = None | |
status = self.api.ntdll.NtOpenFile( | |
ctypes.byref(handle), | |
WindowsConstants.FILE_READ_ATTRIBUTES | WindowsConstants.SYNCHRONIZE, | |
ctypes.byref(obj_attr), | |
ctypes.byref(io_status), | |
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE, | |
WindowsConstants.FILE_SYNCHRONOUS_IO_NONALERT | |
) | |
if status < 0: | |
return False, 0, 0 | |
try: | |
device_info = (DWORD * 3)() | |
status = self.api.ntdll.NtDeviceIoControlFile( | |
handle, | |
None, None, None, | |
ctypes.byref(io_status), | |
NTFSConstants.IOCTL_STORAGE_GET_DEVICE_NUMBER, | |
None, 0, | |
ctypes.byref(device_info), | |
12 | |
) | |
if status < 0 or device_info[0] != 7 or device_info[2] != 0: | |
return False, 0, 0 | |
geom = DISK_GEOMETRY() | |
status = self.api.ntdll.NtDeviceIoControlFile( | |
handle, | |
None, None, None, | |
ctypes.byref(io_status), | |
NTFSConstants.IOCTL_DISK_GET_DRIVE_GEOMETRY, | |
None, 0, | |
ctypes.byref(geom), | |
ctypes.sizeof(geom) | |
) | |
if status < 0: | |
return False, 0, 0 | |
return True, device_info[1], geom.BytesPerSector | |
finally: | |
self.api.ntdll.NtClose(handle) | |
def _find_volume_legacy(self, device_path: str, physical_sector: int) -> Optional[PhysicalToLogicalMapping]: | |
"""Legacy method for finding volumes (Windows 2000 and earlier)""" | |
success, device_num, bytes_per_sector = self._get_device_parameters(device_path) | |
if not success: | |
return None | |
for drive_ord in range(ord('A'), ord('Z') + 1): | |
drive_letter = chr(drive_ord) | |
device_name = f"{drive_letter}:" | |
target_path = ctypes.create_unicode_buffer(100) | |
result = self.api.kernel32.QueryDosDeviceW(device_name, target_path, 100) | |
if result == 0: | |
continue | |
device = f"\\\\.\\{drive_letter}:" | |
handle = self.api.kernel32.CreateFileW( | |
device, | |
WindowsConstants.GENERIC_READ, | |
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE, | |
None, | |
WindowsConstants.OPEN_EXISTING, | |
WindowsConstants.FILE_FLAG_NO_BUFFERING, | |
None | |
) | |
if handle == WindowsConstants.INVALID_HANDLE_VALUE: | |
continue | |
try: | |
geom = DISK_GEOMETRY() | |
bytes_returned = DWORD() | |
if not self.api.kernel32.DeviceIoControl( | |
handle, | |
NTFSConstants.IOCTL_DISK_GET_DRIVE_GEOMETRY, | |
None, 0, | |
ctypes.byref(geom), | |
ctypes.sizeof(geom), | |
ctypes.byref(bytes_returned), | |
None | |
): | |
continue | |
part_info = PARTITION_INFORMATION() | |
if self.api.kernel32.DeviceIoControl( | |
handle, | |
NTFSConstants.IOCTL_DISK_GET_PARTITION_INFO, | |
None, 0, | |
ctypes.byref(part_info), | |
ctypes.sizeof(part_info), | |
ctypes.byref(bytes_returned), | |
None | |
): | |
start_sector = part_info.StartingOffset // geom.BytesPerSector | |
length_sectors = part_info.PartitionLength // geom.BytesPerSector | |
if start_sector <= physical_sector < start_sector + length_sectors: | |
logical_sector = physical_sector - start_sector | |
return PhysicalToLogicalMapping(drive_letter, logical_sector, physical_sector) | |
finally: | |
self.api.kernel32.CloseHandle(handle) | |
return None | |
def _find_volume_modern(self, device_path: str, physical_sector: int) -> Optional[PhysicalToLogicalMapping]: | |
"""Modern method for finding volumes using volume APIs""" | |
volume_name = ctypes.create_unicode_buffer(100) | |
find_handle = self.api.kernel32.FindFirstVolumeW(volume_name, 100) | |
if find_handle == WindowsConstants.INVALID_HANDLE_VALUE: | |
return None | |
try: | |
while True: | |
vol_name = volume_name.value.rstrip('\\') | |
vol_handle = self.api.kernel32.CreateFileW( | |
vol_name, | |
WindowsConstants.FILE_LIST_DIRECTORY, | |
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE, | |
None, | |
WindowsConstants.OPEN_EXISTING, | |
0, | |
None | |
) | |
if vol_handle != WindowsConstants.INVALID_HANDLE_VALUE: | |
extents_buffer = ctypes.create_string_buffer(1000) | |
bytes_returned = DWORD() | |
if self.api.kernel32.DeviceIoControl( | |
vol_handle, | |
NTFSConstants.IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS, | |
None, 0, | |
extents_buffer, | |
1000, | |
ctypes.byref(bytes_returned), | |
None | |
): | |
num_extents = struct.unpack('<I', extents_buffer[:4])[0] | |
offset = 8 | |
for i in range(num_extents): | |
disk_num = struct.unpack('<I', extents_buffer[offset:offset+4])[0] | |
start_offset = struct.unpack('<Q', extents_buffer[offset+8:offset+16])[0] | |
extent_length = struct.unpack('<Q', extents_buffer[offset+16:offset+24])[0] | |
start_sector = start_offset // 512 | |
end_sector = (start_offset + extent_length) // 512 | |
if start_sector <= physical_sector < end_sector: | |
logical_sector = physical_sector - start_sector | |
mount_points = ctypes.create_unicode_buffer(260) | |
returned = DWORD() | |
if self.api.kernel32.GetVolumePathNamesForVolumeNameW( | |
volume_name, | |
mount_points, | |
260, | |
ctypes.byref(returned) | |
) and mount_points.value: | |
path = mount_points.value | |
if len(path) >= 3 and path[1] == ':': | |
drive_letter = path[0] | |
self.api.kernel32.CloseHandle(vol_handle) | |
if num_extents > 1: | |
print("Error: Volume supports physical sector mapping but has multiple extents") | |
return PhysicalToLogicalMapping(drive_letter, logical_sector, physical_sector) | |
offset += 24 | |
self.api.kernel32.CloseHandle(vol_handle) | |
if not self.api.kernel32.FindNextVolumeW(find_handle, volume_name, 100): | |
break | |
finally: | |
self.api.kernel32.FindVolumeClose(find_handle) | |
return None | |
class NTFSVolume: | |
"""Represents an NTFS volume""" | |
def __init__(self, drive_letter: str): | |
self.drive_letter = drive_letter.upper() | |
self.api = WindowsAPI() | |
self.volume_info: Optional[VolumeInfo] = None | |
self._handle: Optional[HANDLE] = None | |
def __enter__(self): | |
self.open() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.close() | |
def open(self): | |
"""Open the volume""" | |
root = f"{self.drive_letter}:\\" | |
fs_name = ctypes.create_unicode_buffer(64) | |
# Check file system | |
if not self.api.kernel32.GetVolumeInformationW( | |
root, None, 0, None, None, None, fs_name, 64 | |
): | |
err = self.api.kernel32.GetLastError() | |
raise VolumeOpenError(f"Could not open drive {self.drive_letter} (error {err})") | |
if fs_name.value.upper() != "NTFS": | |
raise VolumeOpenError(f"Drive {self.drive_letter} is not NTFS") | |
# Open volume handle | |
device_path = f"\\\\.\\{self.drive_letter}:" | |
handle = self.api.kernel32.CreateFileW( | |
device_path, | |
WindowsConstants.GENERIC_READ, | |
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE, | |
None, | |
WindowsConstants.OPEN_EXISTING, | |
WindowsConstants.FILE_FLAG_NO_BUFFERING | | |
WindowsConstants.FILE_FLAG_BACKUP_SEMANTICS | | |
WindowsConstants.FILE_FLAG_WRITE_THROUGH, | |
None | |
) | |
if handle == WindowsConstants.INVALID_HANDLE_VALUE: | |
err = self.api.kernel32.GetLastError() | |
raise VolumeOpenError(f"Could not open drive {self.drive_letter} (error {err})") | |
self._handle = handle | |
self._load_volume_info() | |
def close(self): | |
"""Close the volume""" | |
if self._handle: | |
self.api.kernel32.CloseHandle(self._handle) | |
self._handle = None | |
def _load_volume_info(self): | |
"""Load NTFS volume information""" | |
io_status = IO_STATUS_BLOCK() | |
ntfs_data = NTFS_VOLUME_DATA_BUFFER() | |
status = self.api.ntdll.NtFsControlFile( | |
self._handle, | |
None, None, None, | |
ctypes.byref(io_status), | |
NTFSConstants.FSCTL_GET_NTFS_VOLUME_DATA, | |
None, 0, | |
ctypes.byref(ntfs_data), | |
ctypes.sizeof(ntfs_data) | |
) | |
if status < 0: | |
raise VolumeOpenError(f"Could not get NTFS volume data for drive {self.drive_letter}") | |
self.volume_info = VolumeInfo( | |
handle=self._handle, | |
drive_letter=self.drive_letter, | |
bytes_per_sector=ntfs_data.BytesPerSector, | |
bytes_per_cluster=ntfs_data.BytesPerCluster, | |
sectors_per_cluster=ntfs_data.BytesPerCluster // ntfs_data.BytesPerSector, | |
bytes_per_file_record=ntfs_data.BytesPerFileRecordSegment, | |
mft_start_lcn=ntfs_data.MftStartLcn, | |
mft_total_records=ntfs_data.MftValidDataLength // ntfs_data.BytesPerFileRecordSegment, | |
buffer_size=ntfs_data.BytesPerFileRecordSegment + 16, | |
buffer=ctypes.create_string_buffer(ntfs_data.BytesPerFileRecordSegment + 16) | |
) | |
def read_mft_record(self, record_number: int) -> Optional[bytes]: | |
"""Read an MFT record""" | |
if not self.volume_info: | |
raise VolumeOpenError("Volume not open") | |
input_buffer = NTFS_FILE_RECORD_INPUT_BUFFER() | |
input_buffer.FileReferenceNumber = record_number | |
io_status = IO_STATUS_BLOCK() | |
status = self.api.ntdll.NtFsControlFile( | |
self._handle, | |
None, None, None, | |
ctypes.byref(io_status), | |
NTFSConstants.FSCTL_GET_NTFS_FILE_RECORD, | |
ctypes.byref(input_buffer), | |
ctypes.sizeof(input_buffer), | |
self.volume_info.buffer, | |
self.volume_info.buffer_size | |
) | |
if status < 0: | |
return None | |
# Skip the first 12 bytes to get to the actual MFT record | |
mft_record = self.volume_info.buffer.raw[12:12 + self.volume_info.bytes_per_file_record] | |
return mft_record | |
@dataclass | |
class FileNameInfo: | |
"""Information about file names""" | |
win32_name: Optional[str] = None | |
dos_name: Optional[str] = None | |
def get_display_name(self) -> str: | |
"""Get formatted display name""" | |
if self.win32_name and self.dos_name and self.win32_name.upper() != self.dos_name.upper(): | |
return f"{self.win32_name}" #<|{self.dos_name}>" | |
elif self.win32_name: | |
return self.win32_name | |
elif self.dos_name: | |
return self.dos_name | |
else: | |
return "" | |
class NTFSFileRecord: | |
"""Represents an NTFS file record""" | |
def __init__(self, record_number: int, data: bytes): | |
self.record_number = record_number | |
self.data = data | |
self.is_valid = self._validate() | |
self.attributes = [] | |
if self.is_valid: | |
self._parse_attributes() | |
def _validate(self) -> bool: | |
"""Validate the file record""" | |
if len(self.data) < 48: | |
return False | |
# Check FILE signature | |
if self.data[:4] != b'FILE': | |
return False | |
# Check flags (in use) | |
flags = struct.unpack('<H', self.data[22:24])[0] | |
return bool(flags & 0x01) | |
def _parse_attributes(self): | |
"""Parse attributes from the file record""" | |
first_attr_offset = struct.unpack('<H', self.data[20:22])[0] | |
attr_offset = first_attr_offset | |
while attr_offset < len(self.data) - 16: | |
attr_type = struct.unpack('<I', self.data[attr_offset:attr_offset+4])[0] | |
if attr_type == 0xFFFFFFFF or attr_type == 0: | |
break | |
attr_length = struct.unpack('<I', self.data[attr_offset+4:attr_offset+8])[0] | |
if attr_length == 0 or attr_length > len(self.data) - attr_offset: | |
break | |
attr = NTFSAttribute(self.data[attr_offset:attr_offset+attr_length]) | |
self.attributes.append(attr) | |
attr_offset += attr_length | |
def get_file_names(self) -> FileNameInfo: | |
"""Get all file names (Win32 and DOS)""" | |
names = FileNameInfo() | |
for attr in self.attributes: | |
if attr.type == NTFSAttributeType.FILE_NAME and not attr.is_non_resident: | |
filename, namespace = attr.get_filename_with_namespace() | |
if filename: | |
# Namespace values: 0=POSIX, 1=Win32, 2=DOS, 3=Win32&DOS | |
if namespace == 2: # DOS | |
names.dos_name = filename | |
elif namespace in (1, 3): # Win32 | |
names.win32_name = filename | |
return names | |
def get_file_name(self) -> str: | |
"""Get the formatted file name""" | |
return self.get_file_names().get_display_name() | |
def is_system_file(self) -> bool: | |
"""Check if this is a system file""" | |
return self.record_number < 16 | |
class NTFSAttribute: | |
"""Represents an NTFS attribute""" | |
def __init__(self, data: bytes): | |
self.data = data | |
self.type = struct.unpack('<I', data[0:4])[0] | |
self.length = struct.unpack('<I', data[4:8])[0] | |
self.is_non_resident = bool(data[8]) | |
self.name_length = data[9] | |
self.name_offset = struct.unpack('<H', data[10:12])[0] | |
self.name = self._get_name() | |
def _get_name(self) -> str: | |
"""Get attribute name if present""" | |
if self.name_length > 0 and self.name_offset + self.name_length * 2 <= len(self.data): | |
name_start = self.name_offset | |
name_data = self.data[name_start:name_start + self.name_length * 2] | |
return name_data.decode('utf-16le', errors='ignore') | |
return "" | |
def get_filename(self) -> Optional[str]: | |
"""Get filename from FILE_NAME attribute""" | |
filename, _ = self.get_filename_with_namespace() | |
return filename | |
def get_filename_with_namespace(self) -> Tuple[Optional[str], int]: | |
"""Get filename and namespace from FILE_NAME attribute""" | |
if self.type != NTFSAttributeType.FILE_NAME or self.is_non_resident: | |
return None, -1 | |
value_offset = struct.unpack('<H', self.data[20:22])[0] | |
value_start = value_offset | |
if value_start + 66 < len(self.data): | |
fn_length = self.data[value_start + 64] | |
namespace = self.data[value_start + 65] | |
if value_start + 66 + fn_length * 2 <= len(self.data): | |
name_data = self.data[value_start + 66:value_start + 66 + fn_length * 2] | |
filename = name_data.decode('utf-16le', errors='ignore') | |
return filename, namespace | |
return None, -1 | |
def get_parent_reference(self) -> Optional[int]: | |
"""Get parent reference from FILE_NAME attribute""" | |
if self.type != NTFSAttributeType.FILE_NAME or self.is_non_resident: | |
return None | |
value_offset = struct.unpack('<H', self.data[20:22])[0] | |
value_start = value_offset | |
if value_start + 8 <= len(self.data): | |
parent_ref = struct.unpack('<Q', self.data[value_start:value_start+8])[0] & 0xFFFFFFFFFFFF | |
return parent_ref | |
return None | |
def get_data_runs(self) -> List[Tuple[int, int]]: | |
"""Get data runs for non-resident DATA attribute""" | |
if not self.is_non_resident or self.type != NTFSAttributeType.DATA: | |
return [] | |
runs = [] | |
runlist_offset = struct.unpack('<H', self.data[32:34])[0] | |
pos = runlist_offset | |
current_lcn = 0 | |
while pos < len(self.data): | |
if self.data[pos] == 0: | |
break | |
header = self.data[pos] | |
length_bytes = header & 0x0F | |
offset_bytes = (header >> 4) & 0x0F | |
if length_bytes == 0: | |
break | |
# Parse run length | |
run_length = 0 | |
for i in range(length_bytes): | |
if pos + 1 + i >= len(self.data): | |
break | |
run_length |= self.data[pos + 1 + i] << (i * 8) | |
# Parse run offset | |
run_offset = 0 | |
start = pos + 1 + length_bytes | |
for i in range(offset_bytes): | |
if start + i >= len(self.data): | |
break | |
run_offset |= self.data[start + i] << (i * 8) | |
# Handle sign extension for negative offsets | |
if offset_bytes > 0 and offset_bytes < 8: | |
# Check if the high bit is set (negative number) | |
if self.data[start + offset_bytes - 1] & 0x80: | |
# Create a mask for sign extension | |
shift = offset_bytes * 8 | |
# Sign extend by treating as signed integer | |
if shift < 64: | |
# Create sign extension mask | |
sign_bit = 1 << (shift - 1) | |
if run_offset & sign_bit: | |
# Extend the sign | |
run_offset = run_offset - (1 << shift) | |
current_lcn += run_offset | |
# Only add valid runs (sparse runs have offset 0) | |
if run_length > 0: | |
if offset_bytes == 0: # Sparse run | |
# Don't add sparse runs to the list | |
pass | |
elif current_lcn >= 0: # Valid run | |
runs.append((current_lcn, run_length)) | |
pos += 1 + length_bytes + offset_bytes | |
return runs | |
class NTFSAnalyzer: | |
"""Main NTFS analysis class""" | |
def __init__(self): | |
self.api = WindowsAPI() | |
self.disk_analyzer = PhysicalDiskAnalyzer(self.api) | |
def analyze_sector(self, volume: NTFSVolume, sector: int) -> Optional[Tuple[int, str]]: | |
"""Find which file contains a sector""" | |
target_cluster = sector // volume.volume_info.sectors_per_cluster | |
# First check system files | |
for record_num in range(16): | |
if self._check_file_contains_sector(volume, record_num, target_cluster): | |
return (record_num, self._get_file_path(volume, record_num)) | |
# Then check regular files | |
max_records = min(volume.volume_info.mft_total_records, 1000000) | |
for record_num in range(16, max_records): | |
if record_num % 10000 == 0: | |
logger.debug(f"Scanned {record_num} records...") | |
if self._check_file_contains_sector(volume, record_num, target_cluster): | |
return (record_num, self._get_file_path(volume, record_num)) | |
return None | |
def analyze_physical_sector(self, device_path: str, physical_sector: int) -> bool: | |
"""Analyze a physical sector by finding its volume and file""" | |
mapping = self.disk_analyzer.find_volume_containing_sector(device_path, physical_sector) | |
if not mapping: | |
print(f"Error: Could not find physical sector {physical_sector} (0x{physical_sector:x}) on any volume.") | |
return False | |
print(f"Physical sector {physical_sector} (0x{physical_sector:x}) is on volume {mapping.drive_letter}.") | |
# Now analyze the logical sector on that volume | |
with NTFSVolume(mapping.drive_letter) as volume: | |
result = self.analyze_sector(volume, mapping.logical_sector) | |
if result: | |
record_num, path = result | |
print() | |
print(f"Logical sector {mapping.logical_sector} is in file {record_num}.") | |
print(path) | |
# Show detailed info | |
record_data = volume.read_mft_record(record_num) | |
if record_data: | |
record = NTFSFileRecord(record_num, record_data) | |
self.display_file_info(volume, record, show_path=False) | |
return True | |
else: | |
print(f"Could not locate file containing logical sector {mapping.logical_sector}") | |
return False | |
def _check_file_contains_sector(self, volume: NTFSVolume, record_number: int, | |
target_cluster: int) -> bool: | |
"""Check if a file record contains the target cluster""" | |
record_data = volume.read_mft_record(record_number) | |
if not record_data: | |
return False | |
record = NTFSFileRecord(record_number, record_data) | |
if not record.is_valid: | |
return False | |
for attr in record.attributes: | |
# Skip $Bad stream in $BadClus file | |
if record_number == SystemFileIndex.BAD_CLUS and attr.name == "$Bad": | |
continue | |
if attr.type == NTFSAttributeType.DATA and attr.is_non_resident: | |
for lcn, length in attr.get_data_runs(): | |
if lcn <= target_cluster < lcn + length: | |
return True | |
return False | |
def _get_file_path(self, volume: NTFSVolume, record_number: int) -> str: | |
"""Get the full path of a file""" | |
if record_number < 16: | |
return SystemFileIndex(record_number).get_name() | |
path_components = [] | |
current_record = record_number | |
visited = set() | |
while current_record != SystemFileIndex.ROOT and current_record not in visited: | |
visited.add(current_record) | |
record_data = volume.read_mft_record(current_record) | |
if not record_data: | |
break | |
record = NTFSFileRecord(current_record, record_data) | |
if not record.is_valid: | |
break | |
# Get formatted filename (includes DOS name if different) | |
filename = record.get_file_name() | |
if filename: | |
path_components.append(filename) | |
# Find parent reference (prefer Win32 namespace) | |
parent_ref = None | |
best_namespace = -1 | |
for attr in record.attributes: | |
if attr.type == NTFSAttributeType.FILE_NAME and not attr.is_non_resident: | |
ref = attr.get_parent_reference() | |
_, namespace = attr.get_filename_with_namespace() | |
if ref is not None: | |
if namespace in (1, 3) and best_namespace not in (1, 3): | |
parent_ref = ref | |
best_namespace = namespace | |
elif parent_ref is None: | |
parent_ref = ref | |
best_namespace = namespace | |
if parent_ref is None or parent_ref == current_record: | |
break | |
current_record = parent_ref | |
path_components.reverse() | |
return f"{volume.drive_letter}:\\" + "\\".join(path_components) | |
def analyze_file(self, volume: NTFSVolume, file_path: str) -> Optional[NTFSFileRecord]: | |
"""Analyze a specific file by path""" | |
# Open file to get its record number | |
nt_path = f"\\??\\{file_path}" | |
path_buffer = ctypes.create_unicode_buffer(nt_path, len(nt_path) + 1) | |
ustr = UNICODE_STRING() | |
self.api.ntdll.RtlInitUnicodeString(ctypes.byref(ustr), path_buffer) | |
obj_attr = OBJECT_ATTRIBUTES() | |
obj_attr.Length = ctypes.sizeof(OBJECT_ATTRIBUTES) | |
obj_attr.RootDirectory = None | |
obj_attr.ObjectName = ctypes.cast(ctypes.byref(ustr), PVOID) | |
obj_attr.Attributes = WindowsConstants.OBJ_CASE_INSENSITIVE | |
obj_attr.SecurityDescriptor = None | |
obj_attr.SecurityQualityOfService = None | |
handle = HANDLE() | |
io_status = IO_STATUS_BLOCK() | |
status = self.api.ntdll.NtOpenFile( | |
ctypes.byref(handle), | |
WindowsConstants.FILE_READ_ATTRIBUTES | WindowsConstants.SYNCHRONIZE, | |
ctypes.byref(obj_attr), | |
ctypes.byref(io_status), | |
WindowsConstants.FILE_SHARE_READ, | |
WindowsConstants.FILE_SYNCHRONOUS_IO_NONALERT | |
) | |
if status < 0: | |
raise FileRecordError("Could not open file") | |
try: | |
file_info = FILE_INTERNAL_INFORMATION() | |
status = self.api.ntdll.NtQueryInformationFile( | |
handle, | |
ctypes.byref(io_status), | |
ctypes.byref(file_info), | |
ctypes.sizeof(file_info), | |
WindowsConstants.FILE_INTERNAL_INFORMATION | |
) | |
if status < 0: | |
raise FileRecordError("Could not query file information") | |
record_number = file_info.IndexNumber & 0xFFFFFFFFFFFF | |
# Read the file record | |
record_data = volume.read_mft_record(record_number) | |
if not record_data: | |
raise FileRecordError(f"Could not read file record {record_number}") | |
return NTFSFileRecord(record_number, record_data) | |
finally: | |
self.api.ntdll.NtClose(handle) | |
def display_file_info(self, volume: NTFSVolume, record: NTFSFileRecord, show_path: bool = True): | |
"""Display information about a file record""" | |
if show_path: | |
if record.is_system_file(): | |
try: | |
print(SystemFileIndex(record.record_number).get_name()) | |
except ValueError: | |
print(f"$SystemFile{record.record_number}") | |
else: | |
path = self._get_file_path(volume, record.record_number) | |
print(path) | |
for attr in record.attributes: | |
attr_name = NTFSAttributeType(attr.type).get_name() if attr.type in NTFSAttributeType.__members__.values() else f"${attr.type:04X}" | |
print(f" {attr_name}", end='') | |
if attr.name: | |
print(f" {attr.name}", end='') | |
if attr.is_non_resident: | |
print(" (nonresident)") | |
if attr.type == NTFSAttributeType.DATA: | |
runs = attr.get_data_runs() | |
for lcn, length in runs: | |
start_sector = lcn * volume.volume_info.sectors_per_cluster | |
end_sector = (lcn + length) * volume.volume_info.sectors_per_cluster - 1 | |
print(f" Logical sectors {start_sector}-{end_sector} (0x{start_sector:x}-0x{end_sector:x})") | |
else: | |
print(" (resident)") | |
# Show all filenames for FILE_NAME attributes | |
if attr.type == NTFSAttributeType.FILE_NAME: | |
filename, namespace = attr.get_filename_with_namespace() | |
if filename: | |
namespace_str = {0: "POSIX", 1: "Win32", 2: "DOS", 3: "Win32&DOS"}.get(namespace, f"Unknown({namespace})") | |
print(f" {filename} [{namespace_str}]") | |
print() | |
class CommandLineArgs: | |
"""Parsed command line arguments""" | |
def __init__(self): | |
self.mode = None # 'volume', 'physical', 'file' | |
self.drive_letter = None | |
self.physical_device = None | |
self.file_path = None | |
self.sectors = [] | |
@classmethod | |
def parse(cls, args: List[str]) -> 'CommandLineArgs': | |
"""Parse command line arguments""" | |
if len(args) < 2 or len(args) > 22: | |
return None | |
result = cls() | |
arg1 = args[1] | |
# Check if it's a drive letter | |
if len(arg1) == 1 and arg1.isalpha(): | |
result.mode = 'volume' | |
result.drive_letter = arg1.upper() | |
elif len(arg1) == 2 and arg1[1] == ':' and arg1[0].isalpha(): | |
result.mode = 'volume' | |
result.drive_letter = arg1[0].upper() | |
elif arg1.startswith('\\\\.\\Physical'): | |
result.mode = 'physical' | |
result.physical_device = arg1 | |
if len(args) < 3: | |
return None | |
elif os.path.exists(arg1): | |
result.mode = 'file' | |
result.file_path = arg1 | |
else: | |
# Could be NT-device-path | |
result.mode = 'physical' | |
result.physical_device = arg1 | |
if len(args) < 3: | |
return None | |
# Parse sector numbers | |
if result.mode == 'volume': | |
start_idx = 2 | |
elif result.mode == 'physical': | |
start_idx = 2 | |
else: # file mode | |
start_idx = len(args) # No sectors in file mode | |
for i in range(start_idx, len(args)): | |
try: | |
if args[i].lower().startswith('0x'): | |
sector = int(args[i], 16) | |
else: | |
sector = int(args[i]) | |
result.sectors.append(sector) | |
except ValueError: | |
return None | |
return result | |
def main(): | |
"""Main program entry point""" | |
print("NTFS File Information Utility") | |
# Parse arguments | |
args = CommandLineArgs.parse(sys.argv) | |
if ctypes.windll.shell32.IsUserAnAdmin() == 0: | |
print('This script requires Administrator privileges...') | |
args = None | |
if not args: | |
print(f"\nUsage: {os.path.basename(sys.argv[0])} drive-letter [logical-sector-number]") | |
print(f" {os.path.basename(sys.argv[0])} NT-device-path physical-sector-number") | |
print(f" {os.path.basename(sys.argv[0])} full-win32-path") | |
return 1 | |
analyzer = NTFSAnalyzer() | |
try: | |
if args.mode == 'volume': | |
with NTFSVolume(args.drive_letter) as volume: | |
if args.sectors: | |
# Analyze specific sectors | |
for sector in args.sectors: | |
print() | |
result = analyzer.analyze_sector(volume, sector) | |
if result: | |
record_num, path = result | |
print(f"Logical sector {sector} is in file {record_num}.") | |
print(path) | |
# Show detailed info | |
record_data = volume.read_mft_record(record_num) | |
if record_data: | |
record = NTFSFileRecord(record_num, record_data) | |
analyzer.display_file_info(volume, record, show_path=False) | |
else: | |
print(f"Could not locate file containing sector {sector}") | |
else: | |
# Show system files (dump all files would be too much for default) | |
print(f"Drive {args.drive_letter}:") | |
print("*" * 44) | |
for i in range(16): | |
record_data = volume.read_mft_record(i) | |
if record_data: | |
record = NTFSFileRecord(i, record_data) | |
if record.is_valid: | |
analyzer.display_file_info(volume, record) | |
elif args.mode == 'physical': | |
# Physical device mode | |
for sector in args.sectors: | |
analyzer.analyze_physical_sector(args.physical_device, sector) | |
elif args.mode == 'file': | |
args.file_path = os.path.abspath(args.file_path) | |
if ':' not in args.file_path or not os.path.exists(args.file_path): | |
print("Error: Full path must include drive letter and exist") | |
return 1 | |
drive_letter = args.file_path[0] | |
with NTFSVolume(drive_letter) as volume: | |
record = analyzer.analyze_file(volume, args.file_path) | |
print() | |
analyzer.display_file_info(volume, record) | |
except NTFSError as e: | |
print(f"Error: {e}") | |
return 1 | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
import traceback | |
traceback.print_exc() | |
return 1 | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment