I'm new to Airflow. I want to add data from PostgresOperator to the table using xcom_pull, Here's how I do it:
load_data = PostgresOperator(task_id="load_data",
postgres_conn_id="database_my",
sql=[f"""INSERT INTO test VALUES
('{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id_district']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['coord']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['address']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['properties_json']}}}}'
)
""" for i in range(0, 5)])
But there is such an error.
[2023-02-28, 20:03:48 UTC] {sql.py:375} INFO - Running statement: INSERT INTO test VALUES
(1,
3,
'[20, 50]',
'город ГОРОД',
'{'number': {'title': 'TITLE1', 'value': 'VALUE1'}, 'name': {'title': 'TITLE2', 'value': 'VALUE2 "VALUE2"'}, 'activity': {'title': 'TITLE3', 'value': 'VALUE3'}}'
)
, parameters: None
[2023-02-28, 20:03:48 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/operators/sql.py", line 260, in execute
output = hook.run(
File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/hooks/sql.py", line 349, in run
self._run_command(cur, sql_statement, parameters)
File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/hooks/sql.py", line 380, in _run_command
cur.execute(sql_statement)
psycopg2.errors.SyntaxError: syntax error at or near "number"
LINE 5: '{'number': {'title': 'T...
^
[2023-02-28, 20:03:49 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=data_579, task_id=load_data, execution_date=20230228T190000, start_date=20230228T200348, end_date=20230228T200349
[2023-02-28, 20:03:49 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 10632 for task load_data (syntax error at or near "number"
LINE 5: '{'number': {'title': 'T...
^
; 2491897)
[2023-02-28, 20:03:49 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
I am also interested in the question of how to find out the length of the data from xcom_pull. So that the loop is executed as many times as I have rows.
I understand that the point is in single quotes, but how to make double ones?
It is solved like this:
load_data = PostgresOperator(task_id="load_data",
postgres_conn_id="database_my",
sql=[f"""INSERT INTO test VALUES
('{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id_district']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['coord']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['address']}}}}',
'{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['properties_json'] | tojson}}}}'
)
""" for i in range(0, 5)])