Last active
June 30, 2022 14:13
-
-
Save rlaphoenix/5b433f572fc606a972188e50ee1b9e47 to your computer and use it in GitHub Desktop.
Win32RawDevice - A simple class that allows you to read a Disk device like you can on Linux with random-access read/seeks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This was made to add support for raw block devices on Windows while keeping arbitrary .seek() and .read() support. | |
Accessing raw block devices on Windows limits seeks to be a multiple of their sector size. Effectively only able to | |
seek to specific sector numbers and only able to read a full sector. | |
This class adds a hacky but surprisingly stable method of using a raw block device as if those restrictions were not | |
in place. Interally it follows the restriction but returns only the data that was requested. | |
The goal was to add direct support in pycdlib. This has been done as of v1.13.0! | |
https://github.com/clalancette/pycdlib/blob/7e96f95624b97cfa4e59cc61b2b86ed411f9e3aa/pycdlib/utils.py#L441= | |
https://github.com/clalancette/pycdlib/pull/64 | |
NOTE: You will need administrator access to make the FSCTL call in get_handle() and it's also very much required. | |
This FSCTL call effectively unlocks the sector boundaries allowing you to access the full disc from top to bottom. | |
Without this change your full device reads will not match those from ImgBurn, AnyDVD, DVD Decrypter and so on. | |
A workaround is to install MakeMKV and let it install the `cdarbsvc` (CdRom Device Arbiter service) which effectively | |
removes that requirement. Sadly I do not know how it's doing that so I could not port it to Python. | |
""" | |
import math | |
import os | |
import struct | |
from typing import Optional, Tuple | |
class Win32RawDevice(object): | |
""" | |
Class to read and seek a Windows Raw Device IO object without bother. | |
It deals with getting the full size, allowing full access to all sectors, | |
and alignment with the discs sector size. | |
""" | |
def __init__(self, target): | |
# type: (str) -> None | |
if not win32_has_pywin32: # type: ignore | |
raise RuntimeError("The 'pywin32' module is missing, which is needed to access raw devices on Windows") | |
self.target = target | |
self.sector_size = None | |
self.disc_size = None | |
self.position = 0 | |
self.handle = self.get_handle() | |
self.geometry = self.get_geometry() | |
def __enter__(self): | |
return self | |
def __exit__(self, *_, **__): | |
self.dispose() | |
def __len__(self): | |
# type: () -> int | |
return self.geometry[-1] | |
def dispose(self): | |
# type: () -> None | |
"""Close the win32 handle opened by get_handle.""" | |
if self.handle != win32file.INVALID_HANDLE_VALUE: # type: ignore | |
win32file.CloseHandle(self.handle) # type: ignore | |
def get_target(self): | |
# type: () -> str | |
"""Get UNC target name. Can be `E:` or `PhysicalDriveN`.""" | |
target = self.target | |
if not target.startswith("\\\\.\\"): | |
target = "\\\\.\\" + target | |
return target | |
def get_handle(self): | |
# type: () -> int | |
"""Get a direct handle to the raw UNC target, and unlock its IO capabilities.""" | |
handle = win32file.CreateFile( # type: ignore | |
# https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea | |
self.get_target(), # target | |
win32con.MAXIMUM_ALLOWED, # type: ignore | |
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, # type: ignore | |
None, # security attributes | |
win32con.OPEN_EXISTING, # type: ignore | |
win32con.FILE_ATTRIBUTE_NORMAL, # type: ignore | |
None # template file | |
) | |
if handle == win32file.INVALID_HANDLE_VALUE: # type: ignore | |
raise RuntimeError("Failed to obtain device handle...") | |
# elevate accessible sectors, without this the last 5 sectors (in my case) will not be readable | |
win32file.DeviceIoControl(handle, winioctlcon.FSCTL_ALLOW_EXTENDED_DASD_IO, None, None) # type: ignore | |
return handle | |
def get_geometry(self): | |
# type: () -> Tuple[int, int, int, int, int, int, int] | |
""" | |
Retrieves information about the physical disk's geometry. | |
https://docs.microsoft.com/en-us/windows/win32/api/winioctl/ns-winioctl-disk_geometry_ex | |
Returns a tuple of: | |
Cylinders-Lo | |
Cylinders-Hi | |
Media Type | |
Tracks Per Cylinder | |
Sectors Per Track | |
Bytes Per Sector | |
Disk Size | |
""" | |
geometry_ex = win32file.DeviceIoControl( # type: ignore | |
self.handle, # handle | |
winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY_EX, # type: ignore | |
None, # in buffer | |
32 # out buffer | |
) | |
geometry = struct.unpack("6L", geometry_ex[:24]) | |
disk_size = struct.unpack("<Q", geometry_ex[24:])[0] | |
return (geometry[0], geometry[1], geometry[2], geometry[3], geometry[4], geometry[5], disk_size) | |
def tell(self): | |
# type: () -> int | |
"""Get current (spoofed) position.""" | |
return self.position | |
def _tell(self): | |
# type: () -> int | |
"""Get current real position.""" | |
if not self.handle: | |
self.handle = self.get_handle() | |
return win32file.SetFilePointer(self.handle, 0, win32file.FILE_CURRENT) # type: ignore | |
def seek(self, offset, whence=os.SEEK_SET): | |
# type: (int, int) -> int | |
"""Seek at any point in the stream, in an aligned way.""" | |
if whence == os.SEEK_CUR: | |
whence = self.tell() | |
elif whence == os.SEEK_END: | |
whence = len(self) | |
to = whence + offset | |
closest = self.align(to) # get as close as we can while being aligned | |
if not self.handle: | |
self.handle = self.get_handle() | |
pos = win32file.SetFilePointer(self.handle, closest, win32file.FILE_BEGIN) # type: ignore | |
if pos != closest: | |
raise IOError("Seek was not precise...") | |
self.position = to # not actually at this location, read will deal with it | |
return to | |
def read(self, size=-1): | |
# type: (int) -> bytes | |
"""Read any amount of bytes in the stream, in an aligned way.""" | |
if not self.handle: | |
self.handle = self.get_handle() | |
sector_size = self.geometry[-2] | |
offset = abs(self._tell() - self.tell()) | |
has_data = b'' | |
while self._tell() < self.tell() + size: | |
res, data = win32file.ReadFile(self.handle, sector_size, None) # type: ignore | |
if res != 0: | |
raise IOError("An error occurred: %d %s" % (res, data)) | |
if len(data) < sector_size: | |
raise IOError("Read %d less bytes than requested..." % (sector_size - len(data))) | |
has_data += data | |
# seek to the position wanted + size read, which will then be re-aligned | |
self.seek(self.tell() + size) | |
return has_data[offset:offset + size] | |
def align(self, size, to=None): | |
# type: (int, Optional[int]) -> int | |
""" | |
Align size to the closest but floor mod `to` value. | |
Examples: | |
align(513, to=512) | |
>>>512 | |
align(1023, to=512) | |
>>>512 | |
align(1026, to=512) | |
>>>1024 | |
align(12, to=10) | |
>>>10 | |
""" | |
if to is None: | |
to = self.geometry[-2] # logical bytes per sector value | |
return math.floor(size / to) * to |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment