I am running a Sqoop import command which imports a table from MySQL db and loads it to HDFS using Sqoop. I have created a below DAG which performs this above activity.
from airflow.models import DAG
from airflow.contrib.operators.sqoop_operator import SqoopOperator
from airflow.utils.dates import days_ago
Dag_Sqoop_Import = DAG(dag_id="SqoopImport",
schedule_interval="* * * * *",
start_date=days_ago(2))
sqoop_mysql_import = SqoopOperator(conn_id="sqoop_local",
table="shipmethod",
cmd_type="import",
target_dir="/airflow_sqoopImport",
num_mappers=1,
task_id="SQOOP_Import",
dag=Dag_Sqoop_Import)
sqoop_mysql_import
I have also created a SqoopImport connection in Airflow as below.
But when is Trigger the job it should take the below command as I assume
sqoop import --connect jdbc:mysql://192.168.0.15:3306/adventureworks?characterEncoding=latin1 --driver com.mysql.jdbc.Driver --username xxxx --password xxxxxx --autoreset-to-one-mapper --table workorder --target-dir /user/adminn/workorder
But when I check in logs its actually taking below command
Executing command: sqoop import --username xxxx --password MASKED --num-mappers 1 --connect jdbc:mysql://192.168.0.15:3306/adventureworks?characterEncoding=latin1 --target-dir /airflow_sqoopImport --as-textfile --table shipmethod
And the DAG fails giving below error. also I know the cause of this error, I need to add the parameter driver com.mysql.jdbc.Driver
which can solve the below error. am struggling to add, can you please let me know where am going wrong.
ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@5906ebcb is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.'
Replies Appreciated, thanks.
You should provide the driver class as an argument for the operator and not the connection
sqoop_mysql_import = SqoopOperator(conn_id="sqoop_local",
table="shipmethod",
cmd_type="import",
target_dir="/airflow_sqoopImport",
driver="com.mysql.jdbc.Driver",
num_mappers=1,
task_id="SQOOP_Import",
dag=Dag_Sqoop_Import)