pythonwatchdogpython-watchdogoserror

Watchdog: OSError: [Errno 9] Bad file descriptor with watchdog


I am using Watchdog to observe when .hdf5 files are populated into a folder. When they are populated, the file gets tested if it is finished writing. When it is done finished writing, the file is returned and used. The problem is when I run the main.py program, I get in error. First here is the watchdog_file.py code:

import time
import traceback

import h5py
import queue
from typing import Union

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent


class NewFileHandler(FileSystemEventHandler):
    """h5 file creation handler for Watchdog"""

    def __init__(self):
        self.file_queue = queue.Queue()

    # callback for File/Directory created event, called by Observer.
    def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
        if event.src_path[-3:] == ".hdf5":
            # run callback with path string
            self.file_queue.put(event.src_path)


class ObserverWrapper:
    """Encapsulated Observer boilerplate"""

    def __init__(self, path: str, recursive=True):
        self.path = path
        self.recursive = recursive

        self.observer = Observer()
        self.handler = NewFileHandler()

        self.observer.schedule(self.handler, path=path, recursive=recursive)

        self.start()

    def start(self):
        """
        Starts observing for filesystem events. Runs self.routine() every 1 second.

        :param blocking: If true, blocks main thread until keyboard interrupt.
        """

        self.observer.start()
    def stop(self):
        """
        Stops the observer. When running self.start(blocking=True) then you don't need to call this.
        """

        self.observer.stop()
        self.observer.join()

    def wait_for_file(self):
        """
        Wait and Process newly created files
        """

        max_retry_count = 30 # will try h5 file for a max of35 seconds (upper bound) to see if the file is finished.
        # Files are usually finished within 20-30 seconds
        retry_interval_seconds = 1 # every hundreth it will try the file to see if it finished writing

        # wait for file to be added
        file_path = self.handler.file_queue.get(block=True)
        # try to open the file
        retry_count = 0
        try:
            file = h5py.File(file_path, "r")
        except OSError:
            if retry_count < max_retry_count:
                retry_count += 1
                print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
                time.sleep(retry_interval_seconds)
            else:
                print(f"h5 file <{file_path}> reached max retry count, skipping")
                return None
        except Exception as err:
            print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
            traceback.print_exc()
        else:
            file.close()

            return file_path

And here is where it is called in main.py:

self.listener = watchdog_search.ObserverWrapper("/home/path/to/folder")
self.listener.start()
self.on_finished_run(self.listener.wait_for_file()) #this line uses the finished .hdf5 file. It's not important to the error at hand. 

The error I am getting is the following:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/home/anaconda3/envs/img/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/anaconda3/envs/img/lib/python3.8/site-packages/watchdog/observers/inotify_buffer.py", line 88, in run
    inotify_events = self._inotify.read_events()
  File "/home/anaconda3/envs/img/lib/python3.8/site-packages/watchdog/observers/inotify_c.py", line 285, in read_events
    event_buffer = os.read(self._inotify_fd, event_buffer_size)
OSError: [Errno 9] Bad file descriptor

Solution

  • I solved the issue by re-writing the code:

    class NewFileHandler(FileSystemEventHandler):
    
        def __init__(self, q, *a, **k):
            super().__init__(*a, **k)
            self._q = q
    
        def on_created(self, event):
            self._q.put(event)
    
    class Worker(QObject):
    
        new_file = pyqtSignal(str,str)
    
        def __init__(self, path):
            super().__init__()
            self._q = queue.Queue()
            observer = Observer()
            handler = NewFileHandler(self._q)
            observer.schedule(handler, path=path, recursive=True)
            # starts a background thread! Thus we need to wait for the
            # queue to receive the events in work.
            observer.start()
    
        def work(self):
            while True:
                event = self._q.get()
                max_retry_count = 350  # for test purposes now but want to set an upper bound on verifying a file is finished.
                retry_interval_seconds = .01  # every hundreth it will try the file to see if it finished writing
                retry_count = 0
                if event.event_type == "created" and event.src_path.lower().endswith(".hdf5"):
                    while True:
                        try:
                            file = h5py.File(event.src_path, "r")
                            file.close()
                        except OSError:
                            if retry_count < max_retry_count:
                                retry_count += 1
                                print(f"h5 file <{event.src_path}> is locked, retrying {retry_count}/{max_retry_count}")
                                time.sleep(retry_interval_seconds)
                            else:
                                print(f"h5 file <{event.src_path}> reached max retry count, skipping")
                                break  # <--- looks useful here
                        except Exception as err:
                            print(f"Got unexpected Error <{type(err).__name__}> while opening <{event.src_path}> ")
                            traceback.print_exc()
                        else:
                            self.new_file.emit(event.src_path, os.path.basename(event.src_path))
                            break