airflowairflow-taskflow

Is it possible to use an operator within a function that is returning an operator, in Airflow?


I am currently writing/creating an external table in databricks.

To do so, I am using the DatabricksSqlOperator.

I have a function that returns this Dbks Operator with all the needed details in it.

@dag(...)
.
.
.
def create_external():
    return DatabricksSqlOperator(...)

However, to improve the quality of the code, I want to query the schema table in dbks before creating the external table (to check for schema changes).

I would like to know if it would be possible to use an operator inside the same function, or if it has to be an extra "task" in my DAG. Since its an intermediate task, it will be very ugly to have everywhere.

@dag(...)
.
.
.
def create_external():

    query_result = DatabricksSqlOperator(...)
    if query_result:
        queries= [query1, query2]
    else:
        queries= [query1]

    return DatabricksSqlOperator(query=queries)

Solution

  • It needs to be a separate task plus a branch operator that are aligned in a sequence like this:

    query1 >> test_query >> branch >> query2
    

    If it is a repeating pattern you can wrap it in a task group:

    def create_external():
        def decide(**kwargs):
            result = kwargs['ti'].xcom_pull(task_ids='test')
            return 'query2' if result == "xxx" else None  # adjust the test
    
        @task_group()
        def databricks_group():
            query1 = DatabricksSqlOperator(...)
            test_query = DatabricksSqlOperator(...)
            branch = BranchPythonOperator(
                task_id='branch',
                provide_context=True,
                python_callable=decide
            )
            query2 = DatabricksSqlOperator(...)
    
        return databricks_group()
    

    Notes: