pythonairflowairflow-2.xairflow-taskflow

Airflow sensor unable to access context variable


I'm trying to build a sensor that reads the dag parameters (that you can change when you trigger dag with config) to know how long to wait.

from airflow.decorators import dag, task, task_group
from datetime import date, datetime, timedelta
import re

params = {
    "time":"8h"
}

def parse_time(time_str):
    regex = re.compile(r'^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$')
    parts = regex.match(time_str)
    if parts is None: return timedelta()
    time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
    return timedelta(**time_params)

@dag(
    dag_id="test",
    start_date=datetime(2023, 5, 1),
    schedule_interval =  None,
    catchup=False,
    default_args={"retries":0},
    params=params,
    tags=["test","debug"],
)
def test():
    @task.sensor(
        task_id=f"run_after",
        poke_interval=60 * 5,
        timeout=60 * 60 * 24 * 3,
        mode="reschedule"
    )
    def run_after(**context):
        run_after = context["params"].get("time","0h")
        print(run_after)
        target_time = parse_time(run_after)
        time_since_midnight = datetime.now() - datetime.strptime(context["data_interval_end"].strftime("%Y%m%d"),"%Y%m%d")
        return time_since_midnight > target_time
    
    t=run_after()
test()

It seems the context variable can't be accessed in the sensor (empty dict)... I have no troubles accessing it in a normal task. Am I doing this wrong ? is there a workaround ? (I guess I could access the params in another task and send the data to the sensor through XComs but it makes the dag more complex and it doesn't seem to be the right way to do this).

Thanks for your help


Solution

  • Ok so I could access the context with

    from airflow.operators.python import get_current_context
    

    and then inside the task

    def run_after():
        context=get_current_context()
    

    So problem solved, anyway it would be great if the context was also available in the kwargs.

    Marking the question as solved.