I have written a python Lambda module that uses a manifest for writing csv file records to the Dynamo table.
However, when I perform a sample test where the manifest contains only 1 csv file containing only 10 records, the dynamo table inputs only inputs 2 records
Here is the Key index used as well as the Read and Write capacity plus the Projected Attributes
.
Here is the manifest containing the CSV file of 10 records.
Here is the integration code that loads the records of the manifest's CSV file , into the Dynamo Table.
try:
# get the object
response = obj.get()['Body'].read().decode('latin-1').split('\n')
with table.batch_writer() as batch:
for row in csv.DictReader(response, delimiter=','):
output_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
# batch_items = [];
# batch_update_items = [];
# batch_update_data = [];
#print('<< ROW >> ',row)
#HANDLE EXTRANEOUS UNICODE CHARACTERS
output_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
record = {}
serviceAddress = {}
householdInfo = {}
if row['cac_addr_zip'] not in (' ', '\\N'):
zip = str(''.join(str(row['cac_addr_zip']).split('.')[0]))
serviceAddress['zip'] = str(zip).zfill(5)
#print('<<zip>> ', serviceAddress['zip'])
if row['cac_addr_full'] not in (' ', '\\N'):
serviceAddress['address'] = str(row['cac_addr_full'])
#print('<<address>> ', serviceAddress['address'])
if row['mKey5'] not in (' ', '\\N'):
record['mKey5'] = str(row['mKey5'])
#print('<<mKey5>> ', dyRecord['mKey5'])
if row['cac_addr_city'] not in (' ', '\\N'):
serviceAddress['city'] = str(row['cac_addr_city'])
#print('<<city>> ',serviceAddress['city'])
if row['cac_addr_state'] not in (' ', '\\N'):
serviceAddress['state'] = str(row['cac_addr_state'])
#print('<<state>> ',serviceAddress['state'])
if row['cac_hh_built_year'] not in (' ', '\\N'):
householdInfo['cac_hh_built_year'] = str(row['cac_hh_built_year'])
#print('<<cac_hh_built_year>> ',householdInfo['cac_hh_built_year'] )
if row['cac_home_sq_foot_num'] not in (' ', '\\N'):
householdInfo['cac_home_sq_foot_num'] = str(row['cac_home_sq_foot_num'])
#print('<<cac_home_sq_foot_num>> ',dyRecord['cac_home_sq_foot_num'])
if row['cac_demo_num_in_hh'] not in (' ', '\\N'):
householdInfo['cac_demo_num_in_hh'] = str(row['cac_demo_num_in_hh'])
#print('<<cac_demo_num_in_hh>> ',row['cac_demo_num_in_hh'])
if row['cac_demo_num_adults'] not in (' ', '\\N'):
householdInfo['cac_demo_num_adults'] = str(row['cac_demo_num_adults'])
#print('<<cac_demo_num_adults>> ',row['cac_demo_num_adults'])
if row['cac_demo_hoh_age'] not in (' ', '\\N'):
householdInfo['cac_demo_hoh_age'] = str(row['cac_demo_hoh_age'])
#print('<<cac_demo_hoh_age>> ',row['cac_demo_hoh_age'])
if row['cac_hh_home_own'] not in (' ', '\\N'):
householdInfo['cac_hh_home_own'] = str(row['cac_hh_home_own'])
#print('<<cac_hh_home_own>> ',row['cac_hh_home_own'])
if row['cac_demo_num_kids'] not in (' ', '\\N'):
householdInfo['cac_demo_num_kids'] = str(row['cac_demo_num_kids'])
#print('<<cac_demo_num_kids>> ',row['cac_demo_num_kids'])
if row['cac_demo_income'] not in (' ', '\\N'):
householdInfo['cac_demo_income'] = str(row['cac_demo_income'])
#print('<<cac_demo_income>> ',row['cac_demo_income'])
if row['cac_int_112'] not in (' ', '\\N'):
householdInfo['cac_int_112'] = str(row['cac_int_112'])
#print('<<cac_int_112>> ',row['cac_int_112'])
if row['cac_silh_super'] not in (' ', '\\N') :
householdInfo['cac_silh_super'] = str(row['cac_silh_super'])
#print('<<cac_silh_super>> ',row['cac_silh_super'])
if row['lifedriver_class'] not in (' ', '\\N') :
householdInfo['lifedriver_class'] = str(row['lifedriver_class'])
#print('<<lifedriver_class>> ',row['lifedriver_class'])
if row['CAC_HH_AIRCONDITIONING'] not in (' ', '\\N'):
householdInfo['CAC_HH_AIRCONDITIONING'] = str(row['CAC_HH_AIRCONDITIONING'])
#print('<<CAC_HH_AIRCONDITIONING>> ',row['CAC_HH_AIRCONDITIONING'])
if row['CAC_HH_HOMEHEATINDICATOR'] not in (' ', '\\N'):
householdInfo['CAC_HH_HOMEHEATINDICATOR'] = str(row['CAC_HH_HOMEHEATINDICATOR'])
#print('<<CAC_HH_HOMEHEATINDICATOR>> ',row['CAC_HH_HOMEHEATINDICATOR'])
if row['CAC_HH_RES_LENGTH'] not in (' ', '\\N'):
householdInfo['CAC_HH_RES_LENGTH'] = str(row['CAC_HH_RES_LENGTH'])
#print('<<CAC_HH_RES_LENGTH>> ',row['CAC_HH_RES_LENGTH'])
record['lastModifiedDate'] = output_date
# INSERTING NEW RECORDS
record['serviceAddress'] = serviceAddress
record['householdInfo'] = householdInfo
print('<< RECORDS FOR FILE NAME >> ',file_name,record)
table.put_item(Item=record)
count += 1
# print(count)
except Exception as e:
message = "Process Failed, count = {count} and reason: ".format(count = count)
print(message,e)
Here is the lambda's log file which shows that 10 records have been written to the dynamo table.
Finally, here is the dynamo table with only 2 records.Thanks
You're using datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
as the partition key for your items. That causes all your items to overwrite one another. In DynamoDB your partition key must be unique or your partition and sort key if one is defined.
Make sure you have a unique primary key for each and every item in the base table.