pythonpython-3.xctypesntdllnt-native-api

Python NtQueryDirectoryFile (File information structure)


I've written a simple (test) script to list files in a selected directory. Not using FindFirstFile; only native API. When I execute the script and watch, Win32API monitor tells me STATUS_SUCCESS. My File Information buffer is c_buffer(1024), not using a Unicode buffer to see the raw data.

So after call NtQueryDirectoryFile all is ok. When I write c_buffer in raw mode to console to see the files in the directory, the output is not structured. I created a FILE_DIRECTORY_INFORMATION structure but either it does not work Windows 7 X86 or there's a problem in my code.

My Question: Please tell me which FILE_DIRECTORY_INFORMATION structure use on Windows 7 X86 or any variants

from ctypes import *

hFile = windll.kernel32.CreateFileW("C:\\a",0x80000000,0,0,3,0x02000000,0)

class Info(Union):
    _fields_ = [('STATUS',c_long),
                ('Pointer',c_ulong),]


class io_stat(Structure):
    _fields_ = [('Stat',Info),
                ('Information',c_ulong),]


class FILE_OBJECT(Structure):
    _fields_ = [('Next',c_ulong),
                ('FileIndex',c_ulong),
                ('ctime',c_longlong),
                ('lat',c_longlong),
                ('wtime',c_longlong),
                ('ch',c_longlong),
                ('Endogfile',c_longlong),
                ('allo',c_longlong),
                ('Fileattr',c_ulong),
                ('Filenalen',c_ulong),
                ('Filename',c_wchar * 2),]

b = io_stat()
a = c_buffer(1024)

windll.ntdll.NtQueryDirectoryFile(hFile,0,0,0,byref(b),byref(a),sizeof(a), 1,0,None,0)

print(a.raw)

Not optimized.


Solution

  • NtQueryDirectoryFile should be called in a loop until it returns STATUS_NO_MORE_FILES. If either the returned status is STATUS_BUFFER_OVERFLOW or the status is successful (non-negative) with the status block Information as 0, then double the buffer size and try again. For each successful pass, copy the FILE_DIRECTORY_INFORMATION records out of the buffer. Each record has to be sized to include the FileName. You've reached the end when the Next field is 0.

    The following example subclasses FILE_DIRECTORY_INFORMATION as a DirEntry class that has a listbuf class method to list the records in a queried buffer. It skips the "." and ".." entries. It uses this class in an ntlistdir function that lists the DirEntry records for a given directory via NtQueryDirectoryFile. It supports passing an open file descriptor as the path argument, which is like how os.listdir works on POSIX systems.

    ctypes definitions

    import os
    import msvcrt
    import ctypes
    
    from ctypes import wintypes
    
    ntdll = ctypes.WinDLL('ntdll')
    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
    
    def NtError(status):
        err = ntdll.RtlNtStatusToDosError(status)
        return ctypes.WinError(err)
    
    NTSTATUS = wintypes.LONG
    STATUS_BUFFER_OVERFLOW = NTSTATUS(0x80000005).value
    STATUS_NO_MORE_FILES = NTSTATUS(0x80000006).value
    STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value
    
    ERROR_DIRECTORY = 0x010B
    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
    GENERIC_READ = 0x80000000
    FILE_SHARE_READ = 1
    OPEN_EXISTING = 3
    FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
    FILE_ATTRIBUTE_DIRECTORY = 0x0010
    
    FILE_INFORMATION_CLASS = wintypes.ULONG
    FileDirectoryInformation = 1
    FileBasicInformation = 4
    
    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
    PIO_APC_ROUTINE = wintypes.LPVOID
    ULONG_PTR = wintypes.WPARAM
    
    class UNICODE_STRING(ctypes.Structure):
        _fields_ = (('Length',        wintypes.USHORT),
                    ('MaximumLength', wintypes.USHORT),
                    ('Buffer',        wintypes.LPWSTR))
    
    PUNICODE_STRING = ctypes.POINTER(UNICODE_STRING)
    
    class IO_STATUS_BLOCK(ctypes.Structure):
        class _STATUS(ctypes.Union):
            _fields_ = (('Status',  NTSTATUS),
                        ('Pointer', wintypes.LPVOID))
        _anonymous_ = '_Status',
        _fields_ = (('_Status',     _STATUS),
                    ('Information', ULONG_PTR))
    
    PIO_STATUS_BLOCK = ctypes.POINTER(IO_STATUS_BLOCK)
    
    ntdll.NtQueryInformationFile.restype = NTSTATUS
    ntdll.NtQueryInformationFile.argtypes = (
        wintypes.HANDLE,        # In  FileHandle
        PIO_STATUS_BLOCK,       # Out IoStatusBlock
        wintypes.LPVOID,        # Out FileInformation
        wintypes.ULONG,         # In  Length
        FILE_INFORMATION_CLASS) # In  FileInformationClass
    
    ntdll.NtQueryDirectoryFile.restype = NTSTATUS
    ntdll.NtQueryDirectoryFile.argtypes = (
        wintypes.HANDLE,        # In     FileHandle
        wintypes.HANDLE,        # In_opt Event
        PIO_APC_ROUTINE,        # In_opt ApcRoutine
        wintypes.LPVOID,        # In_opt ApcContext
        PIO_STATUS_BLOCK,       # Out    IoStatusBlock
        wintypes.LPVOID,        # Out    FileInformation
        wintypes.ULONG,         # In     Length
        FILE_INFORMATION_CLASS, # In     FileInformationClass
        wintypes.BOOLEAN,       # In     ReturnSingleEntry
        PUNICODE_STRING,        # In_opt FileName
        wintypes.BOOLEAN)       # In     RestartScan
    
    kernel32.CreateFileW.restype = wintypes.HANDLE
    kernel32.CreateFileW.argtypes = (
        wintypes.LPCWSTR,      # In     lpFileName
        wintypes.DWORD,        # In     dwDesiredAccess
        wintypes.DWORD,        # In     dwShareMode
        LPSECURITY_ATTRIBUTES, # In_opt lpSecurityAttributes
        wintypes.DWORD,        # In     dwCreationDisposition
        wintypes.DWORD,        # In     dwFlagsAndAttributes
        wintypes.HANDLE)       # In_opt hTemplateFile
    
    class FILE_BASIC_INFORMATION(ctypes.Structure):
        _fields_ = (('CreationTime',   wintypes.LARGE_INTEGER),
                    ('LastAccessTime', wintypes.LARGE_INTEGER),
                    ('LastWriteTime',  wintypes.LARGE_INTEGER),
                    ('ChangeTime',     wintypes.LARGE_INTEGER),
                    ('FileAttributes', wintypes.ULONG))
    
    class FILE_DIRECTORY_INFORMATION(ctypes.Structure):
        _fields_ = (('_Next',          wintypes.ULONG),
                    ('FileIndex',      wintypes.ULONG),
                    ('CreationTime',   wintypes.LARGE_INTEGER),
                    ('LastAccessTime', wintypes.LARGE_INTEGER),
                    ('LastWriteTime',  wintypes.LARGE_INTEGER),
                    ('ChangeTime',     wintypes.LARGE_INTEGER),
                    ('EndOfFile',      wintypes.LARGE_INTEGER),
                    ('AllocationSize', wintypes.LARGE_INTEGER),
                    ('FileAttributes', wintypes.ULONG),
                    ('FileNameLength', wintypes.ULONG),
                    ('_FileName',      wintypes.WCHAR * 1))
    
        @property
        def FileName(self):
            addr = ctypes.addressof(self) + type(self)._FileName.offset
            size = self.FileNameLength // ctypes.sizeof(wintypes.WCHAR)
            return (wintypes.WCHAR * size).from_address(addr).value
    

    DirEntry and ntlistdir

    class DirEntry(FILE_DIRECTORY_INFORMATION):
        def __repr__(self):
            return '<{} {!r}>'.format(self.__class__.__name__, self.FileName)
    
        @classmethod
        def listbuf(cls, buf):
            result = []
            base_size = ctypes.sizeof(cls) - ctypes.sizeof(wintypes.WCHAR)
            offset = 0
            while True:
                fdi = cls.from_buffer(buf, offset)
                if fdi.FileNameLength and fdi.FileName not in ('.', '..'):
                    cfdi = cls()
                    size = base_size + fdi.FileNameLength
                    ctypes.resize(cfdi, size)
                    ctypes.memmove(ctypes.byref(cfdi), ctypes.byref(fdi), size)
                    result.append(cfdi)
                if fdi._Next:
                    offset += fdi._Next
                else:
                    break
            return result
    
    def isdir(path):
        if not isinstance(path, int):
            return os.path.isdir(path)
        try:
            hFile = msvcrt.get_osfhandle(path)
        except IOError:
            return False
        iosb = IO_STATUS_BLOCK()
        info = FILE_BASIC_INFORMATION()
        status = ntdll.NtQueryInformationFile(hFile, ctypes.byref(iosb),
                    ctypes.byref(info), ctypes.sizeof(info),
                    FileBasicInformation)
        return bool(status >= 0 and info.FileAttributes & FILE_ATTRIBUTE_DIRECTORY)
    
    def ntlistdir(path=None):
        result = []
    
        if path is None:
            path = os.getcwd()
    
        if isinstance(path, int):
            close = False
            fd = path
            hFile = msvcrt.get_osfhandle(fd)
        else:
            close = True
            hFile = kernel32.CreateFileW(path, GENERIC_READ, FILE_SHARE_READ,
                        None, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, None)
            if hFile == INVALID_HANDLE_VALUE:
                raise ctypes.WinError(ctypes.get_last_error())
            fd = msvcrt.open_osfhandle(hFile, os.O_RDONLY)
    
        try:
            if not isdir(fd):
                raise ctypes.WinError(ERROR_DIRECTORY)
            iosb = IO_STATUS_BLOCK()
            info = (ctypes.c_char * 4096)()
            while True:
                status = ntdll.NtQueryDirectoryFile(hFile, None, None, None,
                            ctypes.byref(iosb), ctypes.byref(info),
                            ctypes.sizeof(info), FileDirectoryInformation,
                            False, None, False)
                if (status == STATUS_BUFFER_OVERFLOW or
                    iosb.Information == 0 and status >= 0):
                    info = (ctypes.c_char * (ctypes.sizeof(info) * 2))()
                elif status == STATUS_NO_MORE_FILES:
                    break
                elif status >= 0:
                    sublist = DirEntry.listbuf(info)
                    result.extend(sublist)
                else:
                    raise NtError(status)
        finally:
            if close:
                os.close(fd)
    
        return result
    

    Example

    if __name__ == '__main__':
        import sys
        for entry in ntlistdir(sys.exec_prefix):
            print(entry)
    

    Output:

    <DirEntry 'DLLs'>
    <DirEntry 'include'>
    <DirEntry 'Lib'>
    <DirEntry 'libs'>
    <DirEntry 'LICENSE.txt'>
    <DirEntry 'NEWS.txt'>
    <DirEntry 'python.exe'>
    <DirEntry 'python.pdb'>
    <DirEntry 'python3.dll'>
    <DirEntry 'python36.dll'>
    <DirEntry 'python36.pdb'>
    <DirEntry 'python36_d.dll'>
    <DirEntry 'python36_d.pdb'>
    <DirEntry 'python3_d.dll'>
    <DirEntry 'pythonw.exe'>
    <DirEntry 'pythonw.pdb'>
    <DirEntry 'pythonw_d.exe'>
    <DirEntry 'pythonw_d.pdb'>
    <DirEntry 'python_d.exe'>
    <DirEntry 'python_d.pdb'>
    <DirEntry 'Scripts'>
    <DirEntry 'tcl'>
    <DirEntry 'Tools'>
    <DirEntry 'vcruntime140.dll'>