I am managing a data pipeline using Kedro and at the last step I have a huge csv file stored in a S3 bucket and I need to load it back to SQL Server.
I'd normally go about that with a bulk insert, but not quite sure how to fit that into the kedro templates. This are the destination table and the S3 Bucket as configured in the catalog.yml
flp_test:
type: pandas.SQLTableDataSet
credentials: dw_dev_credentials
table_name: flp_tst
load_args:
schema: 'dwschema'
save_args:
schema: 'dwschema'
if_exists: 'replace'
bulk_insert_input:
type: pandas.CSVDataSet
filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
credentials: dev_s3
def insert_data(self, conn, csv_file_nm, db_table_nm):
qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
# Execute the query
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
csv_file_nm
to my bulk_insert_input
S3 catalog?dw_dev_credentials
to do the insert?Kedro's pandas.SQLTableDataSet.html uses the pandas.to_sql method as is. To use this as is you would need one pandas.CSVDataSet
into a node
which then writes to a target pandas.SQLDataTable
dataset in order to write it to SQL. If you have Spark available this will be faster than Pandas.
In order to use the built in BULK INSERT
query I think you will need to define a custom dataset.