While testing airflow with pytest, i got an Error.
# tests/conftest.py
import datetime
import pytest
from airflow.models import DAG
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={
"owner": "airflow",
"start_date": datetime.datetime(2025, 4, 5),
"end_date": datetime.datetime(2025, 4, 6)
},
schedule=datetime.timedelta(days=1)
)
# tests/test_instance_context.py
import datetime
from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils import timezone
class SampleDAG(BaseOperator):
template_fields = ("_start_date", "_end_date")
def __init__(self, start_date, end_date, **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._end_date = end_date
def execute(self, context):
context["ti"].xcom_push(key="start_date", value=self.start_date)
context["ti"].xcom_push(key="end_date", value=self.end_date)
return context
def test_execute(test_dag: DAG):
task = SampleDAG(
task_id="test",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
dag=test_dag
)
task.run(
start_date=test_dag.default_args["start_date"],
end_date=test_dag.default_args["end_date"]
)
expected_start_date = datetime.datetime(2025, 4, 5, tzinfo=timezone.utc)
expected_end_date = datetime.datetime(2025, 4, 6, tzinfo=timezone.utc)
assert task.start_date == expected_start_date
assert task.end_date == expected_end_date
Test code is passed, but I got an issue here.
tests/test_instance_context.py [2025-04-26T12:51:18.289+0000] {taskinstance.py:2604} INFO - Dependencies not met for <TaskInstance: test_dag.test manual__2025-04-05T00:00:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state.
[2025-04-26T12:51:18.303+0000] {taskinstance.py:2604} INFO - Dependencies not met for <TaskInstance: test_dag.test manual__2025-04-06T00:00:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state.
.
I want to test task.run to see difference between task.run
and task.execute
.
when I passed jinja variables, then airflow automatically rendering the variables by run method.
So, I want to see prev_ds, ds, start_date, end_date is successfully rendered. But I got an error above..
The error occurs because your SampleDAG
operator is failing during execution, which causes subsequent runs to fail due to the task's "failed" state. Let's fix this step by step:
Attribute Mismatch: You're using self.start_date
and self.end_date
in execute()
but these attributes don't exist (you defined self._start_date
and self._end_date
).
Template Rendering: You're passing Jinja templates ({{ prev_ds }}
, {{ ds }}
) but not properly rendering them before execution.
Task Execution Context: The run()
method needs proper context for template rendering.
class SampleDAG(BaseOperator):
template_fields = ("_start_date", "_end_date")
def __init__(self, start_date, end_date, **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._end_date = end_date
def execute(self, context):
# Use the correct attribute names that match __init__
context["ti"].xcom_push(key="start_date", value=self._start_date)
context["ti"].xcom_push(key="end_date", value=self._end_date)
return context
def test_execute(test_dag: DAG):
task = SampleDAG(
task_id="test",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
dag=test_dag
)
# Create proper execution context
execution_date = test_dag.default_args["start_date"]
end_date = test_dag.default_args["end_date"]
# Run with proper context
task.run(
start_date=execution_date,
end_date=end_date,
execution_date=execution_date, # Needed for template rendering
run_id=f"test_run_{execution_date.isoformat()}",
ignore_first_depends_on_past=True
)