db2airflowjpypejaydebeapi

Airflow run a DAG to manipulate DB2 data that rasise a jaydebeapi.Error


I follow the offcial website of Airflow to produce my Airflow DAG to connect to DB2. When i run a DAG to insert data or update data that will raise a jaydebeapi.Error. Even though Airflow raise a jaydebeapi.Error, the data still has inserted/updated in DB2 successfully. The DAG on the Airflow UI will be marked FAILED. I don't know what steps i miss to do.

My DAG code snippet:

with DAG("my_dag1", default_args=default_args,
    schedule_interval="@daily", catchup=False) as dag:

    cerating_table = JdbcOperator(
        task_id='creating_table',
        jdbc_conn_id='db2',
        sql=r"""
            insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
        """,
        autocommit=True,
        dag=dag
   )

DAG log:

[2022-06-20 02:16:03,743] {base.py:68} INFO - Using connection ID 'db2' for task execution.
[2022-06-20 02:16:04,785] {dbapi.py:213} INFO - Running statement: 
            insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
        , parameters: None
[2022-06-20 02:16:04,842] {dbapi.py:221} INFO - Rows affected: 1
[2022-06-20 02:16:04,844] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 76, in execute
    return hook.run(self.sql, self.autocommit, parameters=self.parameters, handler=fetch_all_handler)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 195, in run
    result = handler(cur)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 30, in fetch_all_handler
    return cursor.fetchall()
  File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 596, in fetchall
    row = self.fetchone()
  File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 561, in fetchone
    raise Error()
jaydebeapi.Error
[2022-06-20 02:16:04,847] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=my_dag1, task_id=creating_table, execution_date=20210101T000000, start_date=, end_date=20220620T021604

I have installed required python packages of Airflow. List below:

Package(System) name/Version

  1. Airflow/2.3.2
  2. IBM DB2/11.5.7
  3. OpenJDK/15.0.2
  4. JayDeBeApi/1.2.0
  5. JPype1/0.7.2
  6. apache-airflow-providers-jdbc/3.0.0

I have tried to use the latest version of item 4(1.2.3) and item 5(1.4.0) still doesn't work. I also have downgraded Airflow version to 2.2.3 or 2.2.5 got same result.

How to solve this problem?


Solution

  • The error doesn't happen in the original insert query but due to a fetchall introduced in this PR - https://github.com/apache/airflow/pull/23817

    Using apache-airflow-providers-jdbc/2.1.3 might be an easy workaround.

    To get the root cause, set DEBUG logging level in Airflow and see why the fetchall causes the error. Having the full traceback will help