airflowdatabricksazure-databricks

Airflow it's running two jobs and returning error in Databricks but ran succesfully


I’m encountering an issue with an Airflow DAG that runs daily. When I trigger the DAG manually, it executes successfully, and all tasks complete without any issues. However, when the DAG runs according to the schedule, the tasks execute successfully within Databricks, but Airflow still reports an error.

Here’s the relevant part of my DAG:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

dag = DAG(
    'bronze_to_silver_and_silver_to_gold',
    default_args=default_args,
    description='DAG to run the steps bronze to silver and silver to gold.',
    schedule_interval='0 8 * * *',  
    start_date=datetime(2024, 1, 1),  
    catchup=False,
    max_active_runs=1,
    concurrency=1,
)

slack_failure_notifier = SlackNotifier(
    slack_conn_id="slack_conn",
    text=SLACK_FAILURE_MESSAGE,
    channel="pipeline-status",
)

norden_complete_process = DatabricksRunNowOperator(
    task_id='complete_process',
    databricks_conn_id='databricks_default',
    job_id=get_latest_job_id(),
    python_params=[
        "name", "",
        "tables", ""
    ],
    dag=dag,
    on_failure_callback=slack_failure_notifier,
)

complete_process

job json:

{
  "job_id": ,
  "creator_user_name": "",
  "run_as_user_name": "",
  "run_as_owner": false,
  "settings": {
    "name": "",
    "email_notifications": {},
    "webhook_notifications": {},
    "timeout_seconds": 0,
    "max_concurrent_runs": 1,
    "tasks": [
      {
        "task_key": "Bronze_to_silver",
        "run_if": "ALL_SUCCESS",
        "python_wheel_task": {
          "package_name": "data",
          "entry_point": "bronze_to_silver",
          "parameters": [
            "--client-name",
            "example_client",
            "--client-tables",
            "example_table"
          ]
        },
        "existing_cluster_id": "",
        "libraries": [
          {
            "whl": "dbfs:/artifacts/3.0.0-py3-none-any.whl"
          }
        ],
        "timeout_seconds": 0,
        "email_notifications": {}
      },
      {
        "task_key": "Silver_to_gold",
        "depends_on": [
          {
            "task_key": "Bronze_to_silver"
          }
        ],
        "run_if": "ALL_SUCCESS",
        "python_wheel_task": {
          "package_name": "data",
          "entry_point": "silver_to_gold",
          "parameters": [
            "--client-name",
            "example_client"
          ]
        },
        "existing_cluster_id": "",
        "libraries": [
          {
            "whl": "dbfs:/artifacts/3.0.0-py3-none-any.whl"
          }
        ],
        "timeout_seconds": 0,
        "email_notifications": {}
      }
    ],
    "format": "MULTI_TASK"
  },
  "created_time": 1724685763834
}

Problem:

•   When manually triggered, the DAG completes successfully.
•   When scheduled, the tasks appear to work fine in Databricks (Kubernetes pods are running normally), but Airflow still marks the task with an error, showing that one task was skipped while the other completed successfully.

I’ve noticed that Airflow defaults to two attempts before failing, and I’m unsure if this is related to my issue. In the Airflow UI, I see an error message (see image below) even though everything seems to be running fine in Databricks.

Questions:

1.  Why is Airflow reporting an error for scheduled runs when everything works fine during manual execution?
2.  Is there a way to prevent the DAG from retrying or to correctly capture the success status from Databricks?

Any help or guidance would be greatly appreciated!

Screenshots:

One skipped other worked it

Error in skipped

Airflow schedule

The 2 attempts has this error


Solution

  • The issue was with the replicas in the Kubernetes deployment. The deployment was configured with multiple replicas, which caused two requests to be sent when the CronJob was triggered. I resolved it by setting the number of replicas to one, and now it works as expected.