I'm really curious to know, how airflow sensors work when a file is not completely copied/loaded to a target data storage. As an example: we have a filesystem, and sensor checks files within it. We are copying a large file to folder and it really takes some time. Will airflow sensor consume incomplete file or it will wait for the file to be fully loaded?
I'm really looking for an answer and haven't found anything similar
you can see inside the code of FileSensor the answer.
The answer is that the operation system is responsible for the answer. in a test I made while downloading a big file "Mac" for example returned True while downloading the file
the idea is while you copy a big file give it a temporary extension and replace it to the real one after copy finished
def poke(self, context: Context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = os.path.join(basepath, self.filepath)
self.log.info("Poking for file %s", full_path)
for path in glob(full_path, recursive=self.recursive):
if os.path.isfile(path):
mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
self.log.info("Found File %s last modified: %s", str(path), mod_time)
return True
for _, _, files in os.walk(path):
if len(files) > 0:
return True
return False