pythonwindowswinapicd-romiso9660

Python - Windows Raw Disk unable to read final sectors


When accessing a Raw Disk on Windows via Python open(), it for whatever reason does not allow me to read the last 10240 bytes (aka last 5 sectors at 2048 bytes/sector).

When dumping the disc image by other means and comparing the images I can see that the data cannot be assumed to be empty either. In fact, the first of the missing sectors has a UDF Anchor Tag present with related metadata in it. The following sectors are entirely blank.

This is how I dumped the disc contents:

out = open("test.iso", "wb")
with open(r"\\.\D:", "rb") as f:
    while True:
        data = f.read(512)
        if len(data) == 0:
            break
        out.write(data)

If I take that same open() object and tell it to seek to the very end of the disc, it does. So it can clearly reach the sectors at least in terms of seeking. If I then seek back 10240 bytes then attempt to f.read(...), it returns b'' (empty result) and not an error. It doesn't matter what size I tell it to read either. I tried all kinds of sizes, no-arg/default, 1, 12, 255, 512, 2048, 999999, etc.

Another StackOverflow answer on a different (but related) question also reported similar findings on Enhanced Audio Discs but seemingly no discussion was brought up since.

I have tested this on multiple DVD discs from varying kinds of studios and creators, all of which are in great condition with it still occurring.

Example reproducing code:

import os
from wmi import WMI


DISC_LETTER = "D:"

c = WMI()
disc_info = next(iter(c.Win32_CDROMDrive(Drive=DISC_LETTER)), None)
if not disc_info:
    raise("Disc %s not found...", DISC_LETTER)

disc_size = int(disc_info.size)
disc_size += 10240  # WMIC also reports the size without 10240, but it is real!

f = open(r"\\.\%s" % DISC_LETTER, "rb")
f.seek(disc_size)
if f.tell() == disc_size:
    print("Seeked to the end of the disc...")
f.seek(-10240, os.SEEK_CUR)
if f.tell() == disc_size - (2048 * 5):
    print("Seeked 5 sectors before the end of the disc...")
data = f.read(2048 * 5):
print("Data read (len: %d): %b" % (len(data), data))

Any ideas on why this might be would great as I have tried everywhere I could.


Solution

  • It seems this occurs as open(r'\\.\N:') opens the device with restricted boundaries.

    My solution was to open the disc with IOCTL instead of open(). Specifically with CreateFile, DeviceIoControl, and FSCTL_ALLOW_EXTENDED_DASD_IO.

    handle = win32file.CreateFile(
        r"\\.\D:",
        win32con.MAXIMUM_ALLOWED,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_ATTRIBUTE_NORMAL,
        None
    )
    if handle == win32file.INVALID_HANDLE_VALUE:
        raise RuntimeError("Failed to obtain device handle...")
    win32file.DeviceIoControl(handle, winioctlcon.FSCTL_ALLOW_EXTENDED_DASD_IO, None, None)
    

    From here I can use ReadFile and SetFilePointer as replacements for read and seek respectively.

    I even worked on a new class that loads it all and allows you to dynamically read and seek without having to worry about sector alignment.

    class Win32Device:
        """
        Class to read and seek a Windows Raw Device IO object without bother.
        It deals with getting the full size, allowing full access to all sectors,
        and alignment with the discs sector size.
        
        Author: PHOENiX <pragma.exe@gmail.com>
        License: Free, enjoy! This should be a thing open() does by default.
        """
    
        def __init__(self, target):
            # type: (str) -> None
            self.target = target
            self.sector_size = None
            self.disc_size = None
            self.position = 0
    
            self.handle = self.get_handle()
            self.geometry = self.get_geometry()
    
        def __enter__(self):
            return self
    
        def __exit__(self, *_, **__):
            self.dispose()
    
        def __len__(self) -> int:
            return self.geometry[-2]
    
        def dispose(self):
            if self.handle != win32file.INVALID_HANDLE_VALUE:
                win32file.CloseHandle(self.handle)
    
        def get_target(self):
            # type: () -> str
            """Get UNC target name. Can be `E:` or `PhysicalDriveN`."""
            target = self.target
            if not target.startswith("\\\\.\\"):
                target += rf"\\.\{target}"
            return target
    
        def get_handle(self):
            # type: () -> int
            """Get a direct handle to the raw UNC target, and unlock its IO capabilities."""
            handle = win32file.CreateFile(
                # https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
                self.get_target(),  # target
                win32con.MAXIMUM_ALLOWED,  # desired access
                win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,  # share mode, write needed
                None,  # security attributes
                win32con.OPEN_EXISTING,  # creation disposition
                win32con.FILE_ATTRIBUTE_NORMAL,  # flags and attributes
                None  # template file
            )
            if handle == win32file.INVALID_HANDLE_VALUE:
                raise RuntimeError("Failed to obtain device handle...")
            # elevate accessible sectors, without this the last 5 sectors (in my case) will not be readable
            win32file.DeviceIoControl(handle, winioctlcon.FSCTL_ALLOW_EXTENDED_DASD_IO, None, None)
            return handle
    
        def get_geometry(self):
            # type: () -> tuple[int, ...]
            """
            Retrieves information about the physical disk's geometry.
            https://learn.microsoft.com/en-us/windows/win32/api/winioctl/ns-winioctl-disk_geometry_ex
    
            Returns a tuple of:
                Cylinders-Lo
                Cylinders-Hi
                Media Type
                Tracks Per Cylinder
                Sectors Per Track
                Bytes Per Sector
                Disk Size
                Extra Data
            """
            return struct.unpack("8L", win32file.DeviceIoControl(
                self.handle,  # handle
                winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY_EX,  # ioctl api
                b"",  # in buffer
                32  # out buffer
            ))
    
        def tell(self):
            # type: () -> int
            """Get current (spoofed) position."""
            return self.position
    
        def _tell(self):
            # type: () -> int
            """Get current real position."""
            if not self.handle:
                self.handle = self.get_handle()
            return win32file.SetFilePointer(self.handle, 0, win32file.FILE_CURRENT)
    
        def seek(self, offset, whence=os.SEEK_SET):
            # type: (int, int) -> int
            """Seek at any point in the stream, in an aligned way."""
            if whence == os.SEEK_CUR:
                whence = self.tell()
            elif whence == os.SEEK_END:
                whence = len(self)
    
            to = whence + offset
            closest = self.align(to)  # get as close as we can while being aligned
    
            if not self.handle:
                self.handle = self.get_handle()
    
            pos = win32file.SetFilePointer(self.handle, closest, win32file.FILE_BEGIN)
            if pos != closest:
                raise IOError(f"Seek was not precise...")
    
            self.position = to  # not actually at this location, read will deal with it
            return to
    
        def read(self, size=-1):
            # type: (int) -> Optional[bytes]
            """Read any amount of bytes in the stream, in an aligned way."""
            if not self.handle:
                self.handle = self.get_handle()
    
            sector_size = self.geometry[-3]
            offset = abs(self._tell() - self.tell())
    
            has_data = b''
            while self._tell() < self.tell() + size:
                res, data = win32file.ReadFile(self.handle, sector_size, None)
                if res != 0:
                    raise IOError(f"An error occurred: {res} {data}")
                if len(data) < sector_size:
                    raise IOError(f"Read {sector_size - len(data)} less bytes than requested...")
                has_data += data
            # seek to the position wanted + size read, which will then be re-aligned
            self.seek(self.tell() + size)
    
            return has_data[offset:offset + size]
    
        def align(self, size, to=None):
            # type: (int, Optional[int]) -> int
            """
            Align size to the closest but floor mod `to` value.
            Examples:
                align(513, to=512)
                >>>512
                align(1023, to=512)
                >>>512
                align(1026, to=512)
                >>>1024
                align(12, to=10)
                >>>10
            """
            if not to:
                to = self.geometry[-3]  # logical bytes per sector value
            return math.floor(size / to) * to