I'm trying to build a sensor that reads the dag parameters (that you can change when you trigger dag with config) to know how long to wait.
from airflow.decorators import dag, task, task_group
from datetime import date, datetime, timedelta
import re
params = {
"time":"8h"
}
def parse_time(time_str):
regex = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
parts = regex.match(time_str)
if parts is None: return timedelta()
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return timedelta(**time_params)
@dag(
dag_id="test",
start_date=datetime(2023, 5, 1),
schedule_interval = None,
catchup=False,
default_args={"retries":0},
params=params,
tags=["test","debug"],
)
def test():
@task.sensor(
task_id=f"run_after",
poke_interval=60 * 5,
timeout=60 * 60 * 24 * 3,
mode="reschedule"
)
def run_after(**context):
run_after = context["params"].get("time","0h")
print(run_after)
target_time = parse_time(run_after)
time_since_midnight = datetime.now() - datetime.strptime(context["data_interval_end"].strftime("%Y%m%d"),"%Y%m%d")
return time_since_midnight > target_time
t=run_after()
test()
It seems the context variable can't be accessed in the sensor (empty dict)... I have no troubles accessing it in a normal task. Am I doing this wrong ? is there a workaround ? (I guess I could access the params in another task and send the data to the sensor through XComs but it makes the dag more complex and it doesn't seem to be the right way to do this).
Thanks for your help
Ok so I could access the context with
from airflow.operators.python import get_current_context
and then inside the task
def run_after():
context=get_current_context()
So problem solved, anyway it would be great if the context was also available in the kwargs.
Marking the question as solved.