airflowgoogle-cloud-composerairflow-2.x

SpannerQueryDatabaseInstanceOperator how to perform a parametrized query


From the documentation SpannerQueryDatabaseInstanceOperator accepts a query parameter. However there's no something smart like the PostgresOperator that accepts also a parameters parameter to use placeholders into the query itself:

get_birth_date = PostgresOperator(
   task_id="get_birth_date",
   postgres_conn_id="postgres_default",
   sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
   parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"}
)

I am new to Airflow, but by reading a book on it and the documentation it looks like that the suggestion is to try to avoid using PythonOperator as it might lead to define logic within it rather than using Airflow just to do what it's designed for: orchestrating.

So my question are the following:

  1. How would you insert into Spanner values read from a previous task?
  2. I read that storing objects into XComs or Airflow itself is not a good practice for the inter-tasks communication, but at the same time if something has to be read by task one and written by task two, I don't see many alternatives to use XComs.

Thanks


Solution

  • Airflow leverages Jinja to parameterize. When you use Jinja the parameterization is done by Airflow itself and then the SQL statement is submitted to the SQL engine to be executed.

    Some integrations/services have their own parameterization mechanisms thus Airflow can also support that and user can choose what to use.

    PostgresOperator can use SqlAlchemy engine thus if you want this engine to render the statement you can pass the variables to it using the parameters parameter. The answer in https://stackoverflow.com/a/72246305/14624409 shows how to use both options for supported operator.

    In your case, SpannerQueryDatabaseInstanceOperator has query as templated field so you can simply use Jinja engine with it.

    For example:

    SpannerQueryDatabaseInstanceOperator(
        instance_id="my_instance",
        database_id="my_db",
        query="select {{ params.my_parameter }}",
        params={"my_parameter": 5},
        task_id="spanner_instance_query_task",
    )
    

    Which gives:

    enter image description here

    As for your questions:

    How would you insert into Spanner values read from a previous task?

    Simply use {{ ti.xcom_pull(task_ids='run_pod', key='return_value') }} in the sql statement. It will be rendered by Jinja. task_ids is the task_id to pull value from and the key is the identifier of the xcom (task can push several xcoms).

    I read that storing objects into XComs or Airflow itself is not a good practice for the inter-tasks communication, but at the same time if something has to be read by task one and written by task two, I don't see many alternatives to use XComs.

    Xcoms are to make small metadata information accessible to other tasks. For example you can transfer count value of records but not the records themselves. If you need downstream task have access to a big dataset produced by upstream task then store it in the cloud (S3, Google Cloud, etc...). All tasks can access to cloud storage however the local disk of Airflow is not shared between tasks thus you can not relay that storing data on Airflow disk will be available for other tasks.