pythonaws-lambdaamazon-dynamodbamazon-dynamodb-index

Lambda writing CSV records into Dynamo Table does not completely write all records to that table


I have written a python Lambda module that uses a manifest for writing csv file records to the Dynamo table.

  1. An external SA integration loads both the manifest file and the CSV files into an S3 bucket (The csv file names and their record sizes are written within the manifest file).
  2. Then, a transfer lambda examines the S3 bucket's contents and uses Dynamo auto-scaling for times when several files are included with the manifest and each of those files could have a max of 100K records each (but the record sizes and number of files in the manifest could vary).
  3. The transfer lambda calls a second lambda to load the records of those csv files, located within the manifest, into a dynamo table. This is the lambda that is causing the issue.

However, when I perform a sample test where the manifest contains only 1 csv file containing only 10 records, the dynamo table inputs only inputs 2 records

Here is the Key index used as well as the Read and Write capacity plus the Projected Attributes enter image description here

.

Here is the manifest containing the CSV file of 10 records. enter image description here

Here is the integration code that loads the records of the manifest's CSV file , into the Dynamo Table.

try:
    # get the object
    response = obj.get()['Body'].read().decode('latin-1').split('\n')
    with table.batch_writer() as batch:
        for row in csv.DictReader(response, delimiter=','):
            output_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
            # batch_items = [];
            # batch_update_items = [];
            # batch_update_data = [];
            #print('<< ROW  >> ',row)
            #HANDLE EXTRANEOUS UNICODE CHARACTERS
            output_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
            record = {}
            serviceAddress = {}
            householdInfo = {}
            if row['cac_addr_zip'] not in (' ', '\\N'):
                zip = str(''.join(str(row['cac_addr_zip']).split('.')[0]))
                serviceAddress['zip'] = str(zip).zfill(5)
                #print('<<zip>> ', serviceAddress['zip'])
            if row['cac_addr_full'] not in (' ', '\\N'):
                serviceAddress['address'] = str(row['cac_addr_full'])
                #print('<<address>> ', serviceAddress['address'])
            if row['mKey5'] not in (' ', '\\N'):
                record['mKey5'] = str(row['mKey5'])

                #print('<<mKey5>> ', dyRecord['mKey5'])

            if row['cac_addr_city'] not in (' ', '\\N'):
                serviceAddress['city'] = str(row['cac_addr_city'])
                #print('<<city>> ',serviceAddress['city'])

            
            if row['cac_addr_state'] not in (' ', '\\N'):
                serviceAddress['state'] = str(row['cac_addr_state'])
                #print('<<state>> ',serviceAddress['state'])

            if row['cac_hh_built_year'] not in (' ', '\\N'):
                householdInfo['cac_hh_built_year'] = str(row['cac_hh_built_year'])
                #print('<<cac_hh_built_year>> ',householdInfo['cac_hh_built_year'] )
               
            if row['cac_home_sq_foot_num'] not in (' ', '\\N'):
                householdInfo['cac_home_sq_foot_num'] = str(row['cac_home_sq_foot_num'])
                #print('<<cac_home_sq_foot_num>> ',dyRecord['cac_home_sq_foot_num'])
                
            if row['cac_demo_num_in_hh'] not in (' ', '\\N'):
                householdInfo['cac_demo_num_in_hh'] = str(row['cac_demo_num_in_hh'])
                #print('<<cac_demo_num_in_hh>> ',row['cac_demo_num_in_hh'])
               
            if row['cac_demo_num_adults'] not in (' ', '\\N'):
                householdInfo['cac_demo_num_adults'] = str(row['cac_demo_num_adults'])
                #print('<<cac_demo_num_adults>> ',row['cac_demo_num_adults'])

            if row['cac_demo_hoh_age'] not in (' ', '\\N'):
                householdInfo['cac_demo_hoh_age'] = str(row['cac_demo_hoh_age'])
                #print('<<cac_demo_hoh_age>> ',row['cac_demo_hoh_age'])

            if row['cac_hh_home_own'] not in (' ', '\\N'):
                householdInfo['cac_hh_home_own'] = str(row['cac_hh_home_own'])
                #print('<<cac_hh_home_own>> ',row['cac_hh_home_own'])
                
            if row['cac_demo_num_kids'] not in (' ', '\\N'):
                householdInfo['cac_demo_num_kids'] = str(row['cac_demo_num_kids'])
                #print('<<cac_demo_num_kids>> ',row['cac_demo_num_kids'])
               
            if row['cac_demo_income'] not in (' ', '\\N'):
                householdInfo['cac_demo_income'] = str(row['cac_demo_income'])
                #print('<<cac_demo_income>> ',row['cac_demo_income'])
               
            if row['cac_int_112'] not in (' ', '\\N'):
                householdInfo['cac_int_112'] = str(row['cac_int_112'])
                #print('<<cac_int_112>> ',row['cac_int_112'])
              
            if row['cac_silh_super'] not in (' ', '\\N') :
                householdInfo['cac_silh_super'] = str(row['cac_silh_super'])
                #print('<<cac_silh_super>> ',row['cac_silh_super'])
               
            if row['lifedriver_class'] not in (' ', '\\N') :
                householdInfo['lifedriver_class'] = str(row['lifedriver_class'])
                #print('<<lifedriver_class>> ',row['lifedriver_class'])
                
            if row['CAC_HH_AIRCONDITIONING'] not in (' ', '\\N'):
                householdInfo['CAC_HH_AIRCONDITIONING'] = str(row['CAC_HH_AIRCONDITIONING'])
                #print('<<CAC_HH_AIRCONDITIONING>> ',row['CAC_HH_AIRCONDITIONING'])
              
            if row['CAC_HH_HOMEHEATINDICATOR'] not in (' ', '\\N'):
                householdInfo['CAC_HH_HOMEHEATINDICATOR'] = str(row['CAC_HH_HOMEHEATINDICATOR'])
                #print('<<CAC_HH_HOMEHEATINDICATOR>> ',row['CAC_HH_HOMEHEATINDICATOR'])
               
            if row['CAC_HH_RES_LENGTH'] not in (' ', '\\N'):
                householdInfo['CAC_HH_RES_LENGTH'] = str(row['CAC_HH_RES_LENGTH'])
                #print('<<CAC_HH_RES_LENGTH>> ',row['CAC_HH_RES_LENGTH'])
                
            record['lastModifiedDate'] = output_date
           
            # INSERTING NEW RECORDS    
            record['serviceAddress'] = serviceAddress
            record['householdInfo'] = householdInfo
            print('<< RECORDS FOR FILE NAME >> ',file_name,record)
            table.put_item(Item=record)
            count += 1
            # print(count)
                
except Exception as e:
    message = "Process Failed, count = {count} and reason: ".format(count = count)
    print(message,e)

Here is the lambda's log file which shows that 10 records have been written to the dynamo table. enter image description here

Finally, here is the dynamo table with only 2 records.Thanks enter image description here


Solution

  • You're using datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") as the partition key for your items. That causes all your items to overwrite one another. In DynamoDB your partition key must be unique or your partition and sort key if one is defined.

    Make sure you have a unique primary key for each and every item in the base table.