sql-serverbulkinsertbulk-loadkedro

How to use SQL Server Bulk Insert in Kedro Node?


I am managing a data pipeline using Kedro and at the last step I have a huge csv file stored in a S3 bucket and I need to load it back to SQL Server.

I'd normally go about that with a bulk insert, but not quite sure how to fit that into the kedro templates. This are the destination table and the S3 Bucket as configured in the catalog.yml

flp_test:
  type: pandas.SQLTableDataSet
  credentials: dw_dev_credentials
  table_name: flp_tst
  load_args:
    schema: 'dwschema'
  save_args:
    schema: 'dwschema'
    if_exists: 'replace'

bulk_insert_input:
   type: pandas.CSVDataSet
   filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
   credentials: dev_s3


def insert_data(self, conn, csv_file_nm, db_table_nm):
    qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
    # Execute the query
    cursor = conn.cursor()
    success = cursor.execute(qry)
    conn.commit()
    cursor.close

Solution

  • Kedro's pandas.SQLTableDataSet.html uses the pandas.to_sql method as is. To use this as is you would need one pandas.CSVDataSet into a node which then writes to a target pandas.SQLDataTable dataset in order to write it to SQL. If you have Spark available this will be faster than Pandas.

    In order to use the built in BULK INSERT query I think you will need to define a custom dataset.