pythonairflowpytest

While testing airflow task with pytest, I got an error


While testing airflow with pytest, i got an Error.

# tests/conftest.py

import datetime
import pytest
from airflow.models import DAG

@pytest.fixture
def test_dag():
    return DAG(
        "test_dag",
        default_args={
            "owner": "airflow",
            "start_date": datetime.datetime(2025, 4, 5),
            "end_date": datetime.datetime(2025, 4, 6)
        },
        schedule=datetime.timedelta(days=1)
    )

# tests/test_instance_context.py

import datetime

from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils import timezone

class SampleDAG(BaseOperator):
    template_fields = ("_start_date", "_end_date")

    def __init__(self, start_date, end_date, **kwargs):
        super().__init__(**kwargs)
        self._start_date = start_date
        self._end_date = end_date
    
    def execute(self, context):
        context["ti"].xcom_push(key="start_date", value=self.start_date)
        context["ti"].xcom_push(key="end_date", value=self.end_date)
        return context


def test_execute(test_dag: DAG):
    task = SampleDAG(
        task_id="test",
        start_date="{{ prev_ds }}",
        end_date="{{ ds }}",
        dag=test_dag
    )

    task.run(
        start_date=test_dag.default_args["start_date"], 
        end_date=test_dag.default_args["end_date"]
    )

    expected_start_date = datetime.datetime(2025, 4, 5, tzinfo=timezone.utc)
    expected_end_date = datetime.datetime(2025, 4, 6, tzinfo=timezone.utc)
    assert task.start_date == expected_start_date
    assert task.end_date == expected_end_date

Test code is passed, but I got an issue here.

tests/test_instance_context.py [2025-04-26T12:51:18.289+0000] {taskinstance.py:2604} INFO - Dependencies not met for <TaskInstance: test_dag.test manual__2025-04-05T00:00:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state.
[2025-04-26T12:51:18.303+0000] {taskinstance.py:2604} INFO - Dependencies not met for <TaskInstance: test_dag.test manual__2025-04-06T00:00:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state.   
.

I want to test task.run to see difference between task.run and task.execute. when I passed jinja variables, then airflow automatically rendering the variables by run method.

So, I want to see prev_ds, ds, start_date, end_date is successfully rendered. But I got an error above..


Solution

  • The error occurs because your SampleDAG operator is failing during execution, which causes subsequent runs to fail due to the task's "failed" state. Let's fix this step by step:

    Key Issues in Your Code:

    1. Attribute Mismatch: You're using self.start_date and self.end_date in execute() but these attributes don't exist (you defined self._start_date and self._end_date).

    2. Template Rendering: You're passing Jinja templates ({{ prev_ds }}, {{ ds }}) but not properly rendering them before execution.

    3. Task Execution Context: The run() method needs proper context for template rendering.

    class SampleDAG(BaseOperator):
        template_fields = ("_start_date", "_end_date")
    
        def __init__(self, start_date, end_date, **kwargs):
            super().__init__(**kwargs)
            self._start_date = start_date
            self._end_date = end_date
        
        def execute(self, context):
            # Use the correct attribute names that match __init__
            context["ti"].xcom_push(key="start_date", value=self._start_date)
            context["ti"].xcom_push(key="end_date", value=self._end_date)
            return context
    
    def test_execute(test_dag: DAG):
        task = SampleDAG(
            task_id="test",
            start_date="{{ prev_ds }}",
            end_date="{{ ds }}",
            dag=test_dag
        )
    
        # Create proper execution context
        execution_date = test_dag.default_args["start_date"]
        end_date = test_dag.default_args["end_date"]
        
        # Run with proper context
        task.run(
            start_date=execution_date,
            end_date=end_date,
            execution_date=execution_date,  # Needed for template rendering
            run_id=f"test_run_{execution_date.isoformat()}",
            ignore_first_depends_on_past=True
        )