I am currently writing/creating an external table in databricks.
To do so, I am using the DatabricksSqlOperator.
I have a function that returns this Dbks Operator with all the needed details in it.
@dag(...)
.
.
.
def create_external():
return DatabricksSqlOperator(...)
However, to improve the quality of the code, I want to query the schema table in dbks before creating the external table (to check for schema changes).
I would like to know if it would be possible to use an operator inside the same function, or if it has to be an extra "task" in my DAG. Since its an intermediate task, it will be very ugly to have everywhere.
@dag(...)
.
.
.
def create_external():
query_result = DatabricksSqlOperator(...)
if query_result:
queries= [query1, query2]
else:
queries= [query1]
return DatabricksSqlOperator(query=queries)
It needs to be a separate task plus a branch operator that are aligned in a sequence like this:
query1 >> test_query >> branch >> query2
If it is a repeating pattern you can wrap it in a task group:
def create_external():
def decide(**kwargs):
result = kwargs['ti'].xcom_pull(task_ids='test')
return 'query2' if result == "xxx" else None # adjust the test
@task_group()
def databricks_group():
query1 = DatabricksSqlOperator(...)
test_query = DatabricksSqlOperator(...)
branch = BranchPythonOperator(
task_id='branch',
provide_context=True,
python_callable=decide
)
query2 = DatabricksSqlOperator(...)
return databricks_group()
Notes:
trigger_rule
traps in pipelines containing branches (read here)BranchSqlOperator
and DatabricksSqlOperator
you might be able to define something like BranchDatabricksSqlOperator
that would allow to have test_query
and branch
as a single task