I follow the offcial website of Airflow to produce my Airflow DAG to connect to DB2. When i run a DAG to insert data or update data that will raise a jaydebeapi.Error. Even though Airflow raise a jaydebeapi.Error, the data still has inserted/updated in DB2 successfully. The DAG on the Airflow UI will be marked FAILED. I don't know what steps i miss to do.
My DAG code snippet:
with DAG("my_dag1", default_args=default_args,
schedule_interval="@daily", catchup=False) as dag:
cerating_table = JdbcOperator(
task_id='creating_table',
jdbc_conn_id='db2',
sql=r"""
insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
""",
autocommit=True,
dag=dag
)
DAG log:
[2022-06-20 02:16:03,743] {base.py:68} INFO - Using connection ID 'db2' for task execution.
[2022-06-20 02:16:04,785] {dbapi.py:213} INFO - Running statement:
insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
, parameters: None
[2022-06-20 02:16:04,842] {dbapi.py:221} INFO - Rows affected: 1
[2022-06-20 02:16:04,844] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 76, in execute
return hook.run(self.sql, self.autocommit, parameters=self.parameters, handler=fetch_all_handler)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 195, in run
result = handler(cur)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 30, in fetch_all_handler
return cursor.fetchall()
File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 596, in fetchall
row = self.fetchone()
File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 561, in fetchone
raise Error()
jaydebeapi.Error
[2022-06-20 02:16:04,847] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=my_dag1, task_id=creating_table, execution_date=20210101T000000, start_date=, end_date=20220620T021604
I have installed required python packages of Airflow. List below:
Package(System) name/Version
I have tried to use the latest version of item 4(1.2.3) and item 5(1.4.0) still doesn't work. I also have downgraded Airflow version to 2.2.3 or 2.2.5 got same result.
How to solve this problem?
The error doesn't happen in the original insert query but due to a fetchall introduced in this PR - https://github.com/apache/airflow/pull/23817
Using apache-airflow-providers-jdbc/2.1.3
might be an easy workaround.
To get the root cause, set DEBUG logging level in Airflow and see why the fetchall causes the error. Having the full traceback will help