airflowsqoopairflow-webserver

Cannot add JDBC driver in Sqoop command when running import command using Airflow 2.5.0


I am running a Sqoop import command which imports a table from MySQL db and loads it to HDFS using Sqoop. I have created a below DAG which performs this above activity.

from airflow.models import DAG
from airflow.contrib.operators.sqoop_operator import SqoopOperator
from airflow.utils.dates import days_ago


Dag_Sqoop_Import = DAG(dag_id="SqoopImport",
                      schedule_interval="* * * * *",
                      start_date=days_ago(2))

sqoop_mysql_import = SqoopOperator(conn_id="sqoop_local",
                                  table="shipmethod",
                                  cmd_type="import",
                                  target_dir="/airflow_sqoopImport",
                                  num_mappers=1,
                                  task_id="SQOOP_Import",
                                  dag=Dag_Sqoop_Import)

sqoop_mysql_import

I have also created a SqoopImport connection in Airflow as below.

enter image description here

But when is Trigger the job it should take the below command as I assume

sqoop import --connect jdbc:mysql://192.168.0.15:3306/adventureworks?characterEncoding=latin1 --driver com.mysql.jdbc.Driver --username xxxx --password xxxxxx --autoreset-to-one-mapper --table workorder --target-dir /user/adminn/workorder

But when I check in logs its actually taking below command

Executing command: sqoop import --username xxxx --password MASKED --num-mappers 1 --connect jdbc:mysql://192.168.0.15:3306/adventureworks?characterEncoding=latin1 --target-dir /airflow_sqoopImport --as-textfile --table shipmethod

And the DAG fails giving below error. also I know the cause of this error, I need to add the parameter driver com.mysql.jdbc.Driver which can solve the below error. am struggling to add, can you please let me know where am going wrong.

ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@5906ebcb is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.'

Replies Appreciated, thanks.


Solution

  • You should provide the driver class as an argument for the operator and not the connection

    sqoop_mysql_import = SqoopOperator(conn_id="sqoop_local",
                                      table="shipmethod",
                                      cmd_type="import",
                                      target_dir="/airflow_sqoopImport",
                                      driver="com.mysql.jdbc.Driver",
                                      num_mappers=1,
                                      task_id="SQOOP_Import",
                                      dag=Dag_Sqoop_Import)