this might be a dup but I couldn't find exactly what I was looking for. Feel free to link any previous answer.
I need to write a python script (bash also would be ok) that continuously watches a directory. When the content of this directory changes (because another program generates a new directory inside of it), I want to run automatically a command line that has the name of the newly created directory as an argument.
Example:
I need to watch directory /home/tmp/
:
the actual content of the directory is:
$ ls /home/tmp
Patient Patient2 Patient3
Suddenly, Patient4
dir arrives in /home/tmp
.
I want a code that runs automatically
$ my_command --target_dir /home/tmp/Patient4/
I hope I'm clear in explaining what I need. Thanks
The answer that i found works on Linux only, and it makes use of the pyinotify
wrapper. below is the wroking code:
class EventProcessor(pyinotify.ProcessEvent):
_methods = ["IN_CREATE",
# "IN_OPEN",
# "IN_ACCESS",
]
def process_generator(cls, method):
def _method_name(self, event):
if event.maskname=="IN_CREATE|IN_ISDIR":
print(f"Starting pipeline for {event.pathname}")
os.system(f"clearlung --single --automatic --base_dir {event.pathname} --target_dir CT " + \
f"--model {MODEL} --subroi --output_dir {OUTPUT} --tag 0 --history_path {HISTORY}")
pass
_method_name.__name__ = "process_{}".format(method)
setattr(cls, _method_name.__name__, _method_name)
for method in EventProcessor._methods:
process_generator(EventProcessor, method)
class PathWatcher():
"""Class to watch for changes"""
def __init__(self, path_to_watch) -> None:
"""Base constructor"""
self.path = path_to_watch
if not os.path.isdir(self.path):
raise FileNotFoundError()
def watch(self,):
"""Main method of the PathWatcher class"""
print(f"Waiting for changes in {self.path}...")
watch_manager = pyinotify.WatchManager()
event_notifier = pyinotify.Notifier(watch_manager, EventProcessor())
watch_this = os.path.abspath(self.path)
watch_manager.add_watch(watch_this, pyinotify.ALL_EVENTS)
event_notifier.loop()