pythonpython-3.xaws-gluepymssql

How to Bulk insert data into MSSQL database in a AWS Glue python shell job?


I have large sets of data in s3. In my Python glue job, I will be extracting data from those files in the form of a pandas data frame and apply necessary transformations on the data frame and then load it into Microsoft SQL database using PYMSSQL library. The final data frame contains an average of 100-200K rows and 180 columns of data. Currently I am using PYMSSQL to connect to the database. The problem is executemany of the cursor class takes too much to load the data. Approximately 20 Min for 100k rows. I checked the logs and it was always the loading which is slow. screenshot attached. How to load them faster? I am attaching my code here:

file=s3.get_object(Bucket=S3_BUCKET_NAME,Key=each_file)
for chunk in pd.read_csv(file['Body'],sep=",",header=None,low_memory=False,chunksize=100000):
 all_data.append(chunk)

data_frame = pd.concat(all_data, axis= 0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
    data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
    data_frame.replace(to_replace ='',value =np.nan,inplace=True)
    data_frame.fillna(value=np.nan, inplace=True)
    data_frame.insert(0,'New-column', 1111)
    sql_data_array =data_frame.replace({np.nan:None}).to_numpy()
    sql_data_tuple=tuple(map(tuple, sql_data_array))
try:
    sql="insert into [db].[schema].[table](column_names)values(%d,%s,%s,%s,%s,%s...)"
    db_cursor.executemany(sql,sql_data_tuple)
    print("loading completed on {}".format(datetime.datetime.now()))
except Exception as e:
    print(e)

Solution

  • I ended up doing this and gave me much better results(1 Million in 11 Min): (Use Glue 2.0 python job instead of python shell job)

    1. Extracted the data from s3
    2. Transformed it using Pandas
    3. Uploaded the transformed file as a CSV to s3.
    4. Created a dynamic frame from a catalog table that was created using a crawler by crawling the transformed CSV file. Or You can create dynamic frame directly using Options.
    5. Synchronize the dynamic frame to the catalog table that was created using a crawler by crawling the Destination MSSQL table.

    Here is the code I've used:

    csv_buffer = StringIO()
    s3_resource = boto3.resource("s3", region_name=AWS_REGION)
    file = s3.get_object(Bucket=S3_BUCKET_NAME, Key=each_file)
    for chunk in pd.read_csv(file['Body'], sep=",", header=None, low_memory=False, chunksize=100000):
        all_data.append(chunk)
    
    data_frame = pd.concat(all_data, axis=0)
    all_data.clear()
    cols = data_frame.select_dtypes(object).columns
    data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
    data_frame.replace(to_replace='', value=np.nan, inplace=True)
    data_frame.fillna(value=np.nan, inplace=True)
    data_frame.insert(0, 'New-column', 1234)
    
    data_frame.to_csv(csv_buffer)
    result = s3_resource.Object(S3_BUCKET_NAME, 'path in s3').put(Body=csv_buffer.getvalue())
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database="source db name", table_name="source table name",
                                                                transformation_ctx="datasource0")
    
    applymapping1 = ApplyMapping.apply(frame=datasource0, mappings=[mappings], transformation_ctx="applymapping1")
    
    selectfields2 = SelectFields.apply(frame=applymapping1, paths=[column names of destination catalog table],
                                       transformation_ctx="selectfields2")
    
    resolvechoice3 = ResolveChoice.apply(frame=selectfields2, choice="MATCH_CATALOG", database="destination dbname",
                                         table_name="destination table name", transformation_ctx="resolvechoice3")
    
    resolvechoice4 = ResolveChoice.apply(frame=resolvechoice3, choice="make_cols", transformation_ctx="resolvechoice4")
    
    datasink5 = glueContext.write_dynamic_frame.from_catalog(frame=resolvechoice4, database="destination db name",
                                                             table_name="destination table name",
                                                             transformation_ctx="datasink5")
    
    job.commit()