I am building a DAG that waits for filesystem changes then runs some analysis on newly appearing or modified files, for which I'm using a FileSensor
. The path definition I'm monitoring contains both a jinja template and either a wildcard or a glob. When a file is found, I would like to provide its absolute path to the subsequent callbacks and tasks. Then, the file's metadata will be compared against some data structure to determine whether it needs to be processed.
The problem: how to "exfiltrate" the found file's path from the sensor? I looked at the source code of FileSensor
, but it only logs the found file, without storing the path anywhere. Is there any way to use the path template and/or the context to reconstruct the path without doing additional filesystem queries?
I thought of a couple of workarounds, but before I go down either path, I wanted to make sure that there's a good reason for it. My ideas are:
PythonOperator
.BashOperator
+ echo, then re-query the filesystem.Here's a simplified outline of my configuration:
# <DAG initialization code>
...
path_template: str = os.path.join(
"/basepath/",
"{{ data_interval_start.format('YYYYMMDD') }}",
f"{source}.{{{{ data_interval_start.format('YYYYMMDD') }}}}*.csv.gz"
)
fs: FileSensor = FileSensor(
task_id=f"{source}_data_sensor",
filepath=path_template,
poke_interval=int(duration(minutes=5).total_seconds()),
timeout=int(duration(hours=1, minutes=30).total_seconds()),
mode="reschedule",
pre_execute=log_execution,
on_success_callback=partial(log_found_file, path_template),
)
fs >> convert(source) >> analyze_data(source)
Where log_found_file
is given below:
def log_found_file(data_path_template: str, ctx: Context) -> None:
"""Logs the discovery of a data file."""
data_path = f(data_path_template, ctx) # <<<<<<<<<<<<<<< Need help with this
stats: os.stat_result = os.stat(data_path)
logger.success(
f"Detected data file {data_path} "
f"of size {stats.st_size}; "
f"created on {stats.st_ctime}; "
f"and last modified on {stats.st_mtime}."
)
I'm working with Airflow 2.8.0 in case it matters.
Below is an implementation of one of the workarounds mentioned in the question:
def log_found_file(data_path_template: str, ctx: Context) -> None:
"""Logs the discovery of a data file."""
glob_str: str = ctx.get("task").render_template(data_path_template, ctx)
data_path: str = glob(glob_str)[0]
...