Skip to content

Instantly share code, notes, and snippets.

@UserUnknownFactor
Last active August 13, 2025 07:41
Show Gist options
  • Save UserUnknownFactor/465182a34594090246bda1f2e4a4724f to your computer and use it in GitHub Desktop.
Save UserUnknownFactor/465182a34594090246bda1f2e4a4724f to your computer and use it in GitHub Desktop.
Full Python port of NTFS File Information Utility
"""
NTFS File Information Utility
Dumps information about an NTFS volume, and optionally determines
which volume and file contains a particular sector and vice versa.
"""
import ctypes
from ctypes import wintypes
import os
import sys
import struct
from enum import IntEnum, Enum
from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict, Union
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
# Type definitions
WCHAR = wintypes.WCHAR
PVOID = ctypes.c_void_p
HANDLE = wintypes.HANDLE
LONG = wintypes.LONG
ULONG = wintypes.ULONG
if ctypes.sizeof(ctypes.c_void_p) == 8:
ULONG_PTR = ctypes.c_uint64
else:
ULONG_PTR = ctypes.c_uint32
POINTER = ctypes.POINTER
DWORD = wintypes.DWORD
LPCWSTR = wintypes.LPCWSTR
LPWSTR = wintypes.LPWSTR
BOOL = wintypes.BOOL
WORD = wintypes.WORD
BYTE = wintypes.BYTE
LARGE_INTEGER = ctypes.c_int64
ULARGE_INTEGER = ctypes.c_uint64
HMODULE = wintypes.HMODULE
LPCVOID = wintypes.LPCVOID
LPCSTR = wintypes.LPCSTR
FARPROC = wintypes.LPCVOID
class WindowsConstants(IntEnum):
"""Windows API constants"""
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
OPEN_EXISTING = 0x00000003
FILE_ATTRIBUTE_NORMAL = 0x00000080
FILE_FLAG_NO_BUFFERING = 0x20000000
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_FLAG_WRITE_THROUGH = 0x80000000
INVALID_HANDLE_VALUE = -1
# File Information Classes
FILE_INTERNAL_INFORMATION = 6
FILE_NAME_INFORMATION = 9
# Object Attributes
OBJ_CASE_INSENSITIVE = 0x00000040
# Access Masks
FILE_READ_ATTRIBUTES = 0x0080
SYNCHRONIZE = 0x00100000
FILE_LIST_DIRECTORY = 0x0001
FILE_SYNCHRONOUS_IO_NONALERT = 0x00000020
class NTFSConstants(IntEnum):
"""NTFS-specific constants"""
# FSCTL codes
FSCTL_GET_NTFS_VOLUME_DATA = 0x00090064
FSCTL_GET_NTFS_FILE_RECORD = 0x00090068
FSCTL_QUERY_ALLOCATED_RANGES = 0x0009006F
# IOCTL codes
IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS = 0x00560000
IOCTL_VOLUME_LOGICAL_TO_PHYSICAL = 0x00560024
IOCTL_DISK_GET_DRIVE_GEOMETRY = 0x00070000
IOCTL_DISK_GET_PARTITION_INFO = 0x00074004
IOCTL_STORAGE_GET_DEVICE_NUMBER = 0x002D1080
# NT Status codes
STATUS_SUCCESS = 0x00000000
STATUS_BUFFER_OVERFLOW = 0x80000005
class NTFSAttributeType(IntEnum):
"""NTFS attribute types"""
UNUSED = 0x00
STANDARD_INFORMATION = 0x10
ATTRIBUTE_LIST = 0x20
FILE_NAME = 0x30
OBJECT_ID = 0x40
SECURITY_DESCRIPTOR = 0x50
VOLUME_NAME = 0x60
VOLUME_INFORMATION = 0x70
DATA = 0x80
INDEX_ROOT = 0x90
INDEX_ALLOCATION = 0xA0
BITMAP = 0xB0
REPARSE_POINT = 0xC0
EA_INFORMATION = 0xD0
EA = 0xE0
def get_name(self) -> str:
"""Get the display name for this attribute type"""
mv = sys.getwindowsversion().major
names = {
self.UNUSED: "$UNUSED",
self.STANDARD_INFORMATION: "$STANDARD_INFORMATION",
self.ATTRIBUTE_LIST: "$ATTRIBUTE_LIST",
self.FILE_NAME: "$FILE_NAME",
self.OBJECT_ID: "$OBJECT_ID",
self.SECURITY_DESCRIPTOR: "$SECURITY_DESCRIPTOR",
self.VOLUME_NAME: "$VOLUME_NAME",
self.VOLUME_INFORMATION: "$VOLUME_INFORMATION",
self.DATA: "$DATA",
self.INDEX_ROOT: "$INDEX_ROOT",
self.INDEX_ALLOCATION: "$INDEX_ALLOCATION",
self.BITMAP: "$BITMAP",
self.REPARSE_POINT: "$REPARSE_POINT" if mv > 4 else "$SYMBOLIC_LINK",
self.EA_INFORMATION: "$EA_INFORMATION",
self.EA: "$EA"
}
return names.get(self, f"${self.value:04X}")
class SystemFileIndex(IntEnum):
"""NTFS system file indices"""
MFT = 0
MFT_MIRR = 1
LOG_FILE = 2
VOLUME = 3
ATTR_DEF = 4
ROOT = 5
BITMAP = 6
BOOT = 7
BAD_CLUS = 8
SECURE = 9
UPCASE = 10
EXTENDED = 11
RESERVED_12 = 12
RESERVED_13 = 13
RESERVED_14 = 14
RESERVED_15 = 15
def get_name(self) -> str:
"""Get the display name for this system file"""
mv = sys.getwindowsversion().major
names = {
self.MFT: "$MFT",
self.MFT_MIRR: "$MFTMirr",
self.LOG_FILE: "$LogFile",
self.VOLUME: "$Volume",
self.ATTR_DEF: "$AttrDef",
self.ROOT: "$Root",
self.BITMAP: "$Bitmap",
self.BOOT: "$Boot",
self.BAD_CLUS: "$BadClus",
self.SECURE: "$Secure" if mv > 4 else "$Quota",
self.UPCASE: "$UpCase",
self.EXTENDED: "$Extended",
self.RESERVED_12: "$Reserved12",
self.RESERVED_13: "$Reserved13",
self.RESERVED_14: "$Reserved14",
self.RESERVED_15: "$Reserved15"
}
return names.get(self, f"$SystemFile{self.value}")
@dataclass
class VolumeInfo:
"""Information about an NTFS volume"""
handle: HANDLE
drive_letter: str
bytes_per_sector: int
sectors_per_cluster: int
bytes_per_cluster: int
bytes_per_file_record: int
mft_start_lcn: int
mft_total_records: int
buffer: ctypes.Array
buffer_size: int
@dataclass
class PhysicalToLogicalMapping:
"""Maps a physical sector to a logical sector on a volume"""
drive_letter: str
logical_sector: int
physical_sector: int
class NTFSError(Exception):
"""Base exception for NTFS operations"""
pass
class VolumeOpenError(NTFSError):
"""Error opening volume"""
pass
class FileRecordError(NTFSError):
"""Error reading file record"""
pass
# Windows API structures
class IO_STATUS_BLOCK(ctypes.Structure):
_fields_ = [
('Status', LONG),
('Information', ULONG_PTR),
]
class UNICODE_STRING(ctypes.Structure):
_fields_ = [
('Length', WORD),
('MaximumLength', WORD),
('Buffer', LPWSTR),
]
class OBJECT_ATTRIBUTES(ctypes.Structure):
_fields_ = [
('Length', ULONG),
('RootDirectory', HANDLE),
('ObjectName', PVOID),
('Attributes', ULONG),
('SecurityDescriptor', PVOID),
('SecurityQualityOfService', PVOID),
]
class DISK_GEOMETRY(ctypes.Structure):
_fields_ = [
("Cylinders", LARGE_INTEGER),
("MediaType", DWORD),
("TracksPerCylinder", DWORD),
("SectorsPerTrack", DWORD),
("BytesPerSector", DWORD),
]
class PARTITION_INFORMATION(ctypes.Structure):
_fields_ = [
("StartingOffset", LARGE_INTEGER),
("PartitionLength", LARGE_INTEGER),
("HiddenSectors", DWORD),
("PartitionNumber", DWORD),
("PartitionType", BYTE),
("BootIndicator", BOOL),
("RecognizedPartition", BOOL),
("RewritePartition", BOOL),
]
class VOLUME_DISK_EXTENT(ctypes.Structure):
_fields_ = [
("DiskNumber", DWORD),
("StartingOffset", LARGE_INTEGER),
("ExtentLength", LARGE_INTEGER),
]
class VOLUME_DISK_EXTENTS(ctypes.Structure):
_fields_ = [
("NumberOfDiskExtents", DWORD),
("Extents", VOLUME_DISK_EXTENT * 1),
]
class NTFS_VOLUME_DATA_BUFFER(ctypes.Structure):
_fields_ = [
("VolumeSerialNumber", LARGE_INTEGER),
("NumberSectors", LARGE_INTEGER),
("TotalClusters", LARGE_INTEGER),
("FreeClusters", LARGE_INTEGER),
("TotalReserved", LARGE_INTEGER),
("BytesPerSector", DWORD),
("BytesPerCluster", DWORD),
("BytesPerFileRecordSegment", DWORD),
("ClustersPerFileRecordSegment", DWORD),
("MftValidDataLength", LARGE_INTEGER),
("MftStartLcn", LARGE_INTEGER),
("Mft2StartLcn", LARGE_INTEGER),
("MftZoneStart", LARGE_INTEGER),
("MftZoneEnd", LARGE_INTEGER),
]
class NTFS_FILE_RECORD_INPUT_BUFFER(ctypes.Structure):
_fields_ = [
("FileReferenceNumber", LARGE_INTEGER),
]
class FILE_INTERNAL_INFORMATION(ctypes.Structure):
_fields_ = [
("IndexNumber", LARGE_INTEGER),
]
class FILE_ALLOCATED_RANGE_BUFFER(ctypes.Structure):
_fields_ = [
("FileOffset", LARGE_INTEGER),
("Length", LARGE_INTEGER),
]
class WindowsAPI:
"""Wrapper for Windows API calls"""
def __init__(self):
self.kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
self.ntdll = ctypes.WinDLL('ntdll', use_last_error=True)
self.user32 = ctypes.WinDLL('user32', use_last_error=True)
self._setup_prototypes()
self._check_volume_functions()
def _setup_prototypes(self):
"""Set up function prototypes"""
# Kernel32 functions
self.kernel32.CreateFileW.argtypes = [
LPCWSTR, DWORD, DWORD, PVOID, DWORD, DWORD, HANDLE]
self.kernel32.CreateFileW.restype = HANDLE
self.kernel32.CloseHandle.argtypes = [HANDLE]
self.kernel32.CloseHandle.restype = BOOL
self.kernel32.DeviceIoControl.argtypes = [
HANDLE, DWORD, PVOID, DWORD, PVOID, DWORD, POINTER(DWORD), PVOID]
self.kernel32.DeviceIoControl.restype = BOOL
self.kernel32.GetVolumeInformationW.argtypes = [
LPCWSTR, LPWSTR, DWORD, POINTER(DWORD),
POINTER(DWORD), POINTER(DWORD), LPWSTR, DWORD]
self.kernel32.GetVolumeInformationW.restype = BOOL
self.kernel32.GetLastError.argtypes = []
self.kernel32.GetLastError.restype = DWORD
self.kernel32.QueryDosDeviceW.argtypes = [LPCWSTR, LPWSTR, DWORD]
self.kernel32.QueryDosDeviceW.restype = DWORD
self.kernel32.GetVersion.argtypes = []
self.kernel32.GetVersion.restype = DWORD
# NT functions
self.ntdll.NtOpenFile.argtypes = [
POINTER(HANDLE), DWORD, POINTER(OBJECT_ATTRIBUTES),
POINTER(IO_STATUS_BLOCK), DWORD, DWORD]
self.ntdll.NtOpenFile.restype = ULONG
self.ntdll.NtQueryInformationFile.argtypes = [
HANDLE, POINTER(IO_STATUS_BLOCK), PVOID, ULONG, DWORD]
self.ntdll.NtQueryInformationFile.restype = ULONG
self.ntdll.NtFsControlFile.argtypes = [
HANDLE, HANDLE, PVOID, PVOID, POINTER(IO_STATUS_BLOCK),
ULONG, PVOID, ULONG, PVOID, ULONG]
self.ntdll.NtFsControlFile.restype = ULONG
self.ntdll.NtDeviceIoControlFile.argtypes = [
HANDLE, HANDLE, PVOID, PVOID, POINTER(IO_STATUS_BLOCK),
ULONG, PVOID, ULONG, PVOID, ULONG]
self.ntdll.NtDeviceIoControlFile.restype = ULONG
self.ntdll.NtClose.argtypes = [HANDLE]
self.ntdll.NtClose.restype = ULONG
self.ntdll.RtlInitUnicodeString.argtypes = [
POINTER(UNICODE_STRING), LPCWSTR]
self.ntdll.RtlInitUnicodeString.restype = None
# User32 functions
self.user32.CharUpperW.argtypes = [LPWSTR]
self.user32.CharUpperW.restype = LPWSTR
def _check_volume_functions(self):
"""Check if newer volume management functions are available"""
self.volume_functions_available = False
try:
self.kernel32.FindFirstVolumeW.argtypes = [LPWSTR, DWORD]
self.kernel32.FindFirstVolumeW.restype = HANDLE
self.kernel32.FindNextVolumeW.argtypes = [HANDLE, LPWSTR, DWORD]
self.kernel32.FindNextVolumeW.restype = BOOL
self.kernel32.FindVolumeClose.argtypes = [HANDLE]
self.kernel32.FindVolumeClose.restype = BOOL
self.kernel32.GetVolumePathNamesForVolumeNameW.argtypes = [
LPCWSTR, LPWSTR, DWORD, POINTER(DWORD)]
self.kernel32.GetVolumePathNamesForVolumeNameW.restype = BOOL
self.volume_functions_available = True
except:
pass
class PhysicalDiskAnalyzer:
"""Analyzes physical disk sectors and maps them to volumes"""
def __init__(self, api: WindowsAPI):
self.api = api
def find_volume_containing_sector(self, device_path: str, physical_sector: int) -> Optional[PhysicalToLogicalMapping]:
"""Find which volume contains a physical sector"""
version = self.api.kernel32.GetVersion() & 0xFF
if version <= 4 or not self.api.volume_functions_available:
return self._find_volume_legacy(device_path, physical_sector)
else:
return self._find_volume_modern(device_path, physical_sector)
def _get_device_parameters(self, device_path: str) -> Tuple[bool, int, int]:
"""Get device parameters for a physical device"""
ustr = UNICODE_STRING()
obj_attr = OBJECT_ATTRIBUTES()
io_status = IO_STATUS_BLOCK()
handle = HANDLE()
self.api.ntdll.RtlInitUnicodeString(ctypes.byref(ustr), device_path)
obj_attr.Length = ctypes.sizeof(OBJECT_ATTRIBUTES)
obj_attr.RootDirectory = None
obj_attr.ObjectName = ctypes.cast(ctypes.byref(ustr), PVOID)
obj_attr.Attributes = WindowsConstants.OBJ_CASE_INSENSITIVE
obj_attr.SecurityDescriptor = None
obj_attr.SecurityQualityOfService = None
status = self.api.ntdll.NtOpenFile(
ctypes.byref(handle),
WindowsConstants.FILE_READ_ATTRIBUTES | WindowsConstants.SYNCHRONIZE,
ctypes.byref(obj_attr),
ctypes.byref(io_status),
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE,
WindowsConstants.FILE_SYNCHRONOUS_IO_NONALERT
)
if status < 0:
return False, 0, 0
try:
device_info = (DWORD * 3)()
status = self.api.ntdll.NtDeviceIoControlFile(
handle,
None, None, None,
ctypes.byref(io_status),
NTFSConstants.IOCTL_STORAGE_GET_DEVICE_NUMBER,
None, 0,
ctypes.byref(device_info),
12
)
if status < 0 or device_info[0] != 7 or device_info[2] != 0:
return False, 0, 0
geom = DISK_GEOMETRY()
status = self.api.ntdll.NtDeviceIoControlFile(
handle,
None, None, None,
ctypes.byref(io_status),
NTFSConstants.IOCTL_DISK_GET_DRIVE_GEOMETRY,
None, 0,
ctypes.byref(geom),
ctypes.sizeof(geom)
)
if status < 0:
return False, 0, 0
return True, device_info[1], geom.BytesPerSector
finally:
self.api.ntdll.NtClose(handle)
def _find_volume_legacy(self, device_path: str, physical_sector: int) -> Optional[PhysicalToLogicalMapping]:
"""Legacy method for finding volumes (Windows 2000 and earlier)"""
success, device_num, bytes_per_sector = self._get_device_parameters(device_path)
if not success:
return None
for drive_ord in range(ord('A'), ord('Z') + 1):
drive_letter = chr(drive_ord)
device_name = f"{drive_letter}:"
target_path = ctypes.create_unicode_buffer(100)
result = self.api.kernel32.QueryDosDeviceW(device_name, target_path, 100)
if result == 0:
continue
device = f"\\\\.\\{drive_letter}:"
handle = self.api.kernel32.CreateFileW(
device,
WindowsConstants.GENERIC_READ,
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE,
None,
WindowsConstants.OPEN_EXISTING,
WindowsConstants.FILE_FLAG_NO_BUFFERING,
None
)
if handle == WindowsConstants.INVALID_HANDLE_VALUE:
continue
try:
geom = DISK_GEOMETRY()
bytes_returned = DWORD()
if not self.api.kernel32.DeviceIoControl(
handle,
NTFSConstants.IOCTL_DISK_GET_DRIVE_GEOMETRY,
None, 0,
ctypes.byref(geom),
ctypes.sizeof(geom),
ctypes.byref(bytes_returned),
None
):
continue
part_info = PARTITION_INFORMATION()
if self.api.kernel32.DeviceIoControl(
handle,
NTFSConstants.IOCTL_DISK_GET_PARTITION_INFO,
None, 0,
ctypes.byref(part_info),
ctypes.sizeof(part_info),
ctypes.byref(bytes_returned),
None
):
start_sector = part_info.StartingOffset // geom.BytesPerSector
length_sectors = part_info.PartitionLength // geom.BytesPerSector
if start_sector <= physical_sector < start_sector + length_sectors:
logical_sector = physical_sector - start_sector
return PhysicalToLogicalMapping(drive_letter, logical_sector, physical_sector)
finally:
self.api.kernel32.CloseHandle(handle)
return None
def _find_volume_modern(self, device_path: str, physical_sector: int) -> Optional[PhysicalToLogicalMapping]:
"""Modern method for finding volumes using volume APIs"""
volume_name = ctypes.create_unicode_buffer(100)
find_handle = self.api.kernel32.FindFirstVolumeW(volume_name, 100)
if find_handle == WindowsConstants.INVALID_HANDLE_VALUE:
return None
try:
while True:
vol_name = volume_name.value.rstrip('\\')
vol_handle = self.api.kernel32.CreateFileW(
vol_name,
WindowsConstants.FILE_LIST_DIRECTORY,
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE,
None,
WindowsConstants.OPEN_EXISTING,
0,
None
)
if vol_handle != WindowsConstants.INVALID_HANDLE_VALUE:
extents_buffer = ctypes.create_string_buffer(1000)
bytes_returned = DWORD()
if self.api.kernel32.DeviceIoControl(
vol_handle,
NTFSConstants.IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS,
None, 0,
extents_buffer,
1000,
ctypes.byref(bytes_returned),
None
):
num_extents = struct.unpack('<I', extents_buffer[:4])[0]
offset = 8
for i in range(num_extents):
disk_num = struct.unpack('<I', extents_buffer[offset:offset+4])[0]
start_offset = struct.unpack('<Q', extents_buffer[offset+8:offset+16])[0]
extent_length = struct.unpack('<Q', extents_buffer[offset+16:offset+24])[0]
start_sector = start_offset // 512
end_sector = (start_offset + extent_length) // 512
if start_sector <= physical_sector < end_sector:
logical_sector = physical_sector - start_sector
mount_points = ctypes.create_unicode_buffer(260)
returned = DWORD()
if self.api.kernel32.GetVolumePathNamesForVolumeNameW(
volume_name,
mount_points,
260,
ctypes.byref(returned)
) and mount_points.value:
path = mount_points.value
if len(path) >= 3 and path[1] == ':':
drive_letter = path[0]
self.api.kernel32.CloseHandle(vol_handle)
if num_extents > 1:
print("Error: Volume supports physical sector mapping but has multiple extents")
return PhysicalToLogicalMapping(drive_letter, logical_sector, physical_sector)
offset += 24
self.api.kernel32.CloseHandle(vol_handle)
if not self.api.kernel32.FindNextVolumeW(find_handle, volume_name, 100):
break
finally:
self.api.kernel32.FindVolumeClose(find_handle)
return None
class NTFSVolume:
"""Represents an NTFS volume"""
def __init__(self, drive_letter: str):
self.drive_letter = drive_letter.upper()
self.api = WindowsAPI()
self.volume_info: Optional[VolumeInfo] = None
self._handle: Optional[HANDLE] = None
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def open(self):
"""Open the volume"""
root = f"{self.drive_letter}:\\"
fs_name = ctypes.create_unicode_buffer(64)
# Check file system
if not self.api.kernel32.GetVolumeInformationW(
root, None, 0, None, None, None, fs_name, 64
):
err = self.api.kernel32.GetLastError()
raise VolumeOpenError(f"Could not open drive {self.drive_letter} (error {err})")
if fs_name.value.upper() != "NTFS":
raise VolumeOpenError(f"Drive {self.drive_letter} is not NTFS")
# Open volume handle
device_path = f"\\\\.\\{self.drive_letter}:"
handle = self.api.kernel32.CreateFileW(
device_path,
WindowsConstants.GENERIC_READ,
WindowsConstants.FILE_SHARE_READ | WindowsConstants.FILE_SHARE_WRITE,
None,
WindowsConstants.OPEN_EXISTING,
WindowsConstants.FILE_FLAG_NO_BUFFERING |
WindowsConstants.FILE_FLAG_BACKUP_SEMANTICS |
WindowsConstants.FILE_FLAG_WRITE_THROUGH,
None
)
if handle == WindowsConstants.INVALID_HANDLE_VALUE:
err = self.api.kernel32.GetLastError()
raise VolumeOpenError(f"Could not open drive {self.drive_letter} (error {err})")
self._handle = handle
self._load_volume_info()
def close(self):
"""Close the volume"""
if self._handle:
self.api.kernel32.CloseHandle(self._handle)
self._handle = None
def _load_volume_info(self):
"""Load NTFS volume information"""
io_status = IO_STATUS_BLOCK()
ntfs_data = NTFS_VOLUME_DATA_BUFFER()
status = self.api.ntdll.NtFsControlFile(
self._handle,
None, None, None,
ctypes.byref(io_status),
NTFSConstants.FSCTL_GET_NTFS_VOLUME_DATA,
None, 0,
ctypes.byref(ntfs_data),
ctypes.sizeof(ntfs_data)
)
if status < 0:
raise VolumeOpenError(f"Could not get NTFS volume data for drive {self.drive_letter}")
self.volume_info = VolumeInfo(
handle=self._handle,
drive_letter=self.drive_letter,
bytes_per_sector=ntfs_data.BytesPerSector,
bytes_per_cluster=ntfs_data.BytesPerCluster,
sectors_per_cluster=ntfs_data.BytesPerCluster // ntfs_data.BytesPerSector,
bytes_per_file_record=ntfs_data.BytesPerFileRecordSegment,
mft_start_lcn=ntfs_data.MftStartLcn,
mft_total_records=ntfs_data.MftValidDataLength // ntfs_data.BytesPerFileRecordSegment,
buffer_size=ntfs_data.BytesPerFileRecordSegment + 16,
buffer=ctypes.create_string_buffer(ntfs_data.BytesPerFileRecordSegment + 16)
)
def read_mft_record(self, record_number: int) -> Optional[bytes]:
"""Read an MFT record"""
if not self.volume_info:
raise VolumeOpenError("Volume not open")
input_buffer = NTFS_FILE_RECORD_INPUT_BUFFER()
input_buffer.FileReferenceNumber = record_number
io_status = IO_STATUS_BLOCK()
status = self.api.ntdll.NtFsControlFile(
self._handle,
None, None, None,
ctypes.byref(io_status),
NTFSConstants.FSCTL_GET_NTFS_FILE_RECORD,
ctypes.byref(input_buffer),
ctypes.sizeof(input_buffer),
self.volume_info.buffer,
self.volume_info.buffer_size
)
if status < 0:
return None
# Skip the first 12 bytes to get to the actual MFT record
mft_record = self.volume_info.buffer.raw[12:12 + self.volume_info.bytes_per_file_record]
return mft_record
@dataclass
class FileNameInfo:
"""Information about file names"""
win32_name: Optional[str] = None
dos_name: Optional[str] = None
def get_display_name(self) -> str:
"""Get formatted display name"""
if self.win32_name and self.dos_name and self.win32_name.upper() != self.dos_name.upper():
return f"{self.win32_name}" #<|{self.dos_name}>"
elif self.win32_name:
return self.win32_name
elif self.dos_name:
return self.dos_name
else:
return ""
class NTFSFileRecord:
"""Represents an NTFS file record"""
def __init__(self, record_number: int, data: bytes):
self.record_number = record_number
self.data = data
self.is_valid = self._validate()
self.attributes = []
if self.is_valid:
self._parse_attributes()
def _validate(self) -> bool:
"""Validate the file record"""
if len(self.data) < 48:
return False
# Check FILE signature
if self.data[:4] != b'FILE':
return False
# Check flags (in use)
flags = struct.unpack('<H', self.data[22:24])[0]
return bool(flags & 0x01)
def _parse_attributes(self):
"""Parse attributes from the file record"""
first_attr_offset = struct.unpack('<H', self.data[20:22])[0]
attr_offset = first_attr_offset
while attr_offset < len(self.data) - 16:
attr_type = struct.unpack('<I', self.data[attr_offset:attr_offset+4])[0]
if attr_type == 0xFFFFFFFF or attr_type == 0:
break
attr_length = struct.unpack('<I', self.data[attr_offset+4:attr_offset+8])[0]
if attr_length == 0 or attr_length > len(self.data) - attr_offset:
break
attr = NTFSAttribute(self.data[attr_offset:attr_offset+attr_length])
self.attributes.append(attr)
attr_offset += attr_length
def get_file_names(self) -> FileNameInfo:
"""Get all file names (Win32 and DOS)"""
names = FileNameInfo()
for attr in self.attributes:
if attr.type == NTFSAttributeType.FILE_NAME and not attr.is_non_resident:
filename, namespace = attr.get_filename_with_namespace()
if filename:
# Namespace values: 0=POSIX, 1=Win32, 2=DOS, 3=Win32&DOS
if namespace == 2: # DOS
names.dos_name = filename
elif namespace in (1, 3): # Win32
names.win32_name = filename
return names
def get_file_name(self) -> str:
"""Get the formatted file name"""
return self.get_file_names().get_display_name()
def is_system_file(self) -> bool:
"""Check if this is a system file"""
return self.record_number < 16
class NTFSAttribute:
"""Represents an NTFS attribute"""
def __init__(self, data: bytes):
self.data = data
self.type = struct.unpack('<I', data[0:4])[0]
self.length = struct.unpack('<I', data[4:8])[0]
self.is_non_resident = bool(data[8])
self.name_length = data[9]
self.name_offset = struct.unpack('<H', data[10:12])[0]
self.name = self._get_name()
def _get_name(self) -> str:
"""Get attribute name if present"""
if self.name_length > 0 and self.name_offset + self.name_length * 2 <= len(self.data):
name_start = self.name_offset
name_data = self.data[name_start:name_start + self.name_length * 2]
return name_data.decode('utf-16le', errors='ignore')
return ""
def get_filename(self) -> Optional[str]:
"""Get filename from FILE_NAME attribute"""
filename, _ = self.get_filename_with_namespace()
return filename
def get_filename_with_namespace(self) -> Tuple[Optional[str], int]:
"""Get filename and namespace from FILE_NAME attribute"""
if self.type != NTFSAttributeType.FILE_NAME or self.is_non_resident:
return None, -1
value_offset = struct.unpack('<H', self.data[20:22])[0]
value_start = value_offset
if value_start + 66 < len(self.data):
fn_length = self.data[value_start + 64]
namespace = self.data[value_start + 65]
if value_start + 66 + fn_length * 2 <= len(self.data):
name_data = self.data[value_start + 66:value_start + 66 + fn_length * 2]
filename = name_data.decode('utf-16le', errors='ignore')
return filename, namespace
return None, -1
def get_parent_reference(self) -> Optional[int]:
"""Get parent reference from FILE_NAME attribute"""
if self.type != NTFSAttributeType.FILE_NAME or self.is_non_resident:
return None
value_offset = struct.unpack('<H', self.data[20:22])[0]
value_start = value_offset
if value_start + 8 <= len(self.data):
parent_ref = struct.unpack('<Q', self.data[value_start:value_start+8])[0] & 0xFFFFFFFFFFFF
return parent_ref
return None
def get_data_runs(self) -> List[Tuple[int, int]]:
"""Get data runs for non-resident DATA attribute"""
if not self.is_non_resident or self.type != NTFSAttributeType.DATA:
return []
runs = []
runlist_offset = struct.unpack('<H', self.data[32:34])[0]
pos = runlist_offset
current_lcn = 0
while pos < len(self.data):
if self.data[pos] == 0:
break
header = self.data[pos]
length_bytes = header & 0x0F
offset_bytes = (header >> 4) & 0x0F
if length_bytes == 0:
break
# Parse run length
run_length = 0
for i in range(length_bytes):
if pos + 1 + i >= len(self.data):
break
run_length |= self.data[pos + 1 + i] << (i * 8)
# Parse run offset
run_offset = 0
start = pos + 1 + length_bytes
for i in range(offset_bytes):
if start + i >= len(self.data):
break
run_offset |= self.data[start + i] << (i * 8)
# Handle sign extension for negative offsets
if offset_bytes > 0 and offset_bytes < 8:
# Check if the high bit is set (negative number)
if self.data[start + offset_bytes - 1] & 0x80:
# Create a mask for sign extension
shift = offset_bytes * 8
# Sign extend by treating as signed integer
if shift < 64:
# Create sign extension mask
sign_bit = 1 << (shift - 1)
if run_offset & sign_bit:
# Extend the sign
run_offset = run_offset - (1 << shift)
current_lcn += run_offset
# Only add valid runs (sparse runs have offset 0)
if run_length > 0:
if offset_bytes == 0: # Sparse run
# Don't add sparse runs to the list
pass
elif current_lcn >= 0: # Valid run
runs.append((current_lcn, run_length))
pos += 1 + length_bytes + offset_bytes
return runs
class NTFSAnalyzer:
"""Main NTFS analysis class"""
def __init__(self):
self.api = WindowsAPI()
self.disk_analyzer = PhysicalDiskAnalyzer(self.api)
def analyze_sector(self, volume: NTFSVolume, sector: int) -> Optional[Tuple[int, str]]:
"""Find which file contains a sector"""
target_cluster = sector // volume.volume_info.sectors_per_cluster
# First check system files
for record_num in range(16):
if self._check_file_contains_sector(volume, record_num, target_cluster):
return (record_num, self._get_file_path(volume, record_num))
# Then check regular files
max_records = min(volume.volume_info.mft_total_records, 1000000)
for record_num in range(16, max_records):
if record_num % 10000 == 0:
logger.debug(f"Scanned {record_num} records...")
if self._check_file_contains_sector(volume, record_num, target_cluster):
return (record_num, self._get_file_path(volume, record_num))
return None
def analyze_physical_sector(self, device_path: str, physical_sector: int) -> bool:
"""Analyze a physical sector by finding its volume and file"""
mapping = self.disk_analyzer.find_volume_containing_sector(device_path, physical_sector)
if not mapping:
print(f"Error: Could not find physical sector {physical_sector} (0x{physical_sector:x}) on any volume.")
return False
print(f"Physical sector {physical_sector} (0x{physical_sector:x}) is on volume {mapping.drive_letter}.")
# Now analyze the logical sector on that volume
with NTFSVolume(mapping.drive_letter) as volume:
result = self.analyze_sector(volume, mapping.logical_sector)
if result:
record_num, path = result
print()
print(f"Logical sector {mapping.logical_sector} is in file {record_num}.")
print(path)
# Show detailed info
record_data = volume.read_mft_record(record_num)
if record_data:
record = NTFSFileRecord(record_num, record_data)
self.display_file_info(volume, record, show_path=False)
return True
else:
print(f"Could not locate file containing logical sector {mapping.logical_sector}")
return False
def _check_file_contains_sector(self, volume: NTFSVolume, record_number: int,
target_cluster: int) -> bool:
"""Check if a file record contains the target cluster"""
record_data = volume.read_mft_record(record_number)
if not record_data:
return False
record = NTFSFileRecord(record_number, record_data)
if not record.is_valid:
return False
for attr in record.attributes:
# Skip $Bad stream in $BadClus file
if record_number == SystemFileIndex.BAD_CLUS and attr.name == "$Bad":
continue
if attr.type == NTFSAttributeType.DATA and attr.is_non_resident:
for lcn, length in attr.get_data_runs():
if lcn <= target_cluster < lcn + length:
return True
return False
def _get_file_path(self, volume: NTFSVolume, record_number: int) -> str:
"""Get the full path of a file"""
if record_number < 16:
return SystemFileIndex(record_number).get_name()
path_components = []
current_record = record_number
visited = set()
while current_record != SystemFileIndex.ROOT and current_record not in visited:
visited.add(current_record)
record_data = volume.read_mft_record(current_record)
if not record_data:
break
record = NTFSFileRecord(current_record, record_data)
if not record.is_valid:
break
# Get formatted filename (includes DOS name if different)
filename = record.get_file_name()
if filename:
path_components.append(filename)
# Find parent reference (prefer Win32 namespace)
parent_ref = None
best_namespace = -1
for attr in record.attributes:
if attr.type == NTFSAttributeType.FILE_NAME and not attr.is_non_resident:
ref = attr.get_parent_reference()
_, namespace = attr.get_filename_with_namespace()
if ref is not None:
if namespace in (1, 3) and best_namespace not in (1, 3):
parent_ref = ref
best_namespace = namespace
elif parent_ref is None:
parent_ref = ref
best_namespace = namespace
if parent_ref is None or parent_ref == current_record:
break
current_record = parent_ref
path_components.reverse()
return f"{volume.drive_letter}:\\" + "\\".join(path_components)
def analyze_file(self, volume: NTFSVolume, file_path: str) -> Optional[NTFSFileRecord]:
"""Analyze a specific file by path"""
# Open file to get its record number
nt_path = f"\\??\\{file_path}"
path_buffer = ctypes.create_unicode_buffer(nt_path, len(nt_path) + 1)
ustr = UNICODE_STRING()
self.api.ntdll.RtlInitUnicodeString(ctypes.byref(ustr), path_buffer)
obj_attr = OBJECT_ATTRIBUTES()
obj_attr.Length = ctypes.sizeof(OBJECT_ATTRIBUTES)
obj_attr.RootDirectory = None
obj_attr.ObjectName = ctypes.cast(ctypes.byref(ustr), PVOID)
obj_attr.Attributes = WindowsConstants.OBJ_CASE_INSENSITIVE
obj_attr.SecurityDescriptor = None
obj_attr.SecurityQualityOfService = None
handle = HANDLE()
io_status = IO_STATUS_BLOCK()
status = self.api.ntdll.NtOpenFile(
ctypes.byref(handle),
WindowsConstants.FILE_READ_ATTRIBUTES | WindowsConstants.SYNCHRONIZE,
ctypes.byref(obj_attr),
ctypes.byref(io_status),
WindowsConstants.FILE_SHARE_READ,
WindowsConstants.FILE_SYNCHRONOUS_IO_NONALERT
)
if status < 0:
raise FileRecordError("Could not open file")
try:
file_info = FILE_INTERNAL_INFORMATION()
status = self.api.ntdll.NtQueryInformationFile(
handle,
ctypes.byref(io_status),
ctypes.byref(file_info),
ctypes.sizeof(file_info),
WindowsConstants.FILE_INTERNAL_INFORMATION
)
if status < 0:
raise FileRecordError("Could not query file information")
record_number = file_info.IndexNumber & 0xFFFFFFFFFFFF
# Read the file record
record_data = volume.read_mft_record(record_number)
if not record_data:
raise FileRecordError(f"Could not read file record {record_number}")
return NTFSFileRecord(record_number, record_data)
finally:
self.api.ntdll.NtClose(handle)
def display_file_info(self, volume: NTFSVolume, record: NTFSFileRecord, show_path: bool = True):
"""Display information about a file record"""
if show_path:
if record.is_system_file():
try:
print(SystemFileIndex(record.record_number).get_name())
except ValueError:
print(f"$SystemFile{record.record_number}")
else:
path = self._get_file_path(volume, record.record_number)
print(path)
for attr in record.attributes:
attr_name = NTFSAttributeType(attr.type).get_name() if attr.type in NTFSAttributeType.__members__.values() else f"${attr.type:04X}"
print(f" {attr_name}", end='')
if attr.name:
print(f" {attr.name}", end='')
if attr.is_non_resident:
print(" (nonresident)")
if attr.type == NTFSAttributeType.DATA:
runs = attr.get_data_runs()
for lcn, length in runs:
start_sector = lcn * volume.volume_info.sectors_per_cluster
end_sector = (lcn + length) * volume.volume_info.sectors_per_cluster - 1
print(f" Logical sectors {start_sector}-{end_sector} (0x{start_sector:x}-0x{end_sector:x})")
else:
print(" (resident)")
# Show all filenames for FILE_NAME attributes
if attr.type == NTFSAttributeType.FILE_NAME:
filename, namespace = attr.get_filename_with_namespace()
if filename:
namespace_str = {0: "POSIX", 1: "Win32", 2: "DOS", 3: "Win32&DOS"}.get(namespace, f"Unknown({namespace})")
print(f" {filename} [{namespace_str}]")
print()
class CommandLineArgs:
"""Parsed command line arguments"""
def __init__(self):
self.mode = None # 'volume', 'physical', 'file'
self.drive_letter = None
self.physical_device = None
self.file_path = None
self.sectors = []
@classmethod
def parse(cls, args: List[str]) -> 'CommandLineArgs':
"""Parse command line arguments"""
if len(args) < 2 or len(args) > 22:
return None
result = cls()
arg1 = args[1]
# Check if it's a drive letter
if len(arg1) == 1 and arg1.isalpha():
result.mode = 'volume'
result.drive_letter = arg1.upper()
elif len(arg1) == 2 and arg1[1] == ':' and arg1[0].isalpha():
result.mode = 'volume'
result.drive_letter = arg1[0].upper()
elif arg1.startswith('\\\\.\\Physical'):
result.mode = 'physical'
result.physical_device = arg1
if len(args) < 3:
return None
elif os.path.exists(arg1):
result.mode = 'file'
result.file_path = arg1
else:
# Could be NT-device-path
result.mode = 'physical'
result.physical_device = arg1
if len(args) < 3:
return None
# Parse sector numbers
if result.mode == 'volume':
start_idx = 2
elif result.mode == 'physical':
start_idx = 2
else: # file mode
start_idx = len(args) # No sectors in file mode
for i in range(start_idx, len(args)):
try:
if args[i].lower().startswith('0x'):
sector = int(args[i], 16)
else:
sector = int(args[i])
result.sectors.append(sector)
except ValueError:
return None
return result
def main():
"""Main program entry point"""
print("NTFS File Information Utility")
# Parse arguments
args = CommandLineArgs.parse(sys.argv)
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
print('This script requires Administrator privileges...')
args = None
if not args:
print(f"\nUsage: {os.path.basename(sys.argv[0])} drive-letter [logical-sector-number]")
print(f" {os.path.basename(sys.argv[0])} NT-device-path physical-sector-number")
print(f" {os.path.basename(sys.argv[0])} full-win32-path")
return 1
analyzer = NTFSAnalyzer()
try:
if args.mode == 'volume':
with NTFSVolume(args.drive_letter) as volume:
if args.sectors:
# Analyze specific sectors
for sector in args.sectors:
print()
result = analyzer.analyze_sector(volume, sector)
if result:
record_num, path = result
print(f"Logical sector {sector} is in file {record_num}.")
print(path)
# Show detailed info
record_data = volume.read_mft_record(record_num)
if record_data:
record = NTFSFileRecord(record_num, record_data)
analyzer.display_file_info(volume, record, show_path=False)
else:
print(f"Could not locate file containing sector {sector}")
else:
# Show system files (dump all files would be too much for default)
print(f"Drive {args.drive_letter}:")
print("*" * 44)
for i in range(16):
record_data = volume.read_mft_record(i)
if record_data:
record = NTFSFileRecord(i, record_data)
if record.is_valid:
analyzer.display_file_info(volume, record)
elif args.mode == 'physical':
# Physical device mode
for sector in args.sectors:
analyzer.analyze_physical_sector(args.physical_device, sector)
elif args.mode == 'file':
args.file_path = os.path.abspath(args.file_path)
if ':' not in args.file_path or not os.path.exists(args.file_path):
print("Error: Full path must include drive letter and exist")
return 1
drive_letter = args.file_path[0]
with NTFSVolume(drive_letter) as volume:
record = analyzer.analyze_file(volume, args.file_path)
print()
analyzer.display_file_info(volume, record)
except NTFSError as e:
print(f"Error: {e}")
return 1
except Exception as e:
print(f"Unexpected error: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment