I'm having a problem with BytesIO
library in Python. I want to convert a pdf file that I have retrieved from an S3 bucket, and convert it into a dataframe using a custom function convert_bytes_to_df
. The first pdf file is fine to convert to a csv, however subsequent csvs look like they have appended to each other. I have tried to reset the IO with seek
and truncate
but it doesn't seem to work. What am I doing wrong?
import boto3
from io import BytesIO,StringIO
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.ERROR)
logging.getLogger(__name__).setLevel(logging.DEBUG)
session = boto3.Session()
s3 = session.resource('s3')
src_bucket = s3.Bucket('input-bucket')
dest_bucket = s3.Bucket('output-bucket')
csv_buffer = StringIO()
def lambda_handler(event,context):
msg = event['Records'][0]['Sns']['Message']
pdf_files = json.loads(msg)['pdf_files']
location = json.loads(msg)['location']
total_files= len(pdf_files)
LOGGER.info('Processing: {}'.format(json.dumps(pdf_files)))
for pdf_file in pdf_files:
file_name = pdf_file['key']
obj = s3.Object(src_bucket.name,file_name)
fs = BytesIO(obj.get()['Body'].read())
df = convert_bytes_to_df(fs)
df.to_csv(csv_buffer,index=False)
s3.Object(dest_bucket.name, location +"/"+file_name.split('.')[0]+".csv").put(Body=csv_buffer.getvalue())
fs.seek(0)
fs.truncate(0)
LOGGER.info('Processed: {} in {}'.format(file_name,location))
LOGGER.info('Converted {} files: {}'.format(total_files,json.dumps(pdf_files)))
src_bucket.objects.all().delete()
LOGGER.info('Deleted all files from {}'.format(src_bucket.name))
move
csv_buffer = StringIO()
inside for loop.
csv_buffer is initialized only once.
you need it to be inside for loop so that it is getting initialized for each element in the loop.
e.g:
for pdf_file in pdf_files:
csv_buffer = StringIO()
file_name = pdf_file['key']
obj = s3.Object(src_bucket.name,file_name)
fs = BytesIO(obj.get()['Body'].read())
df = convert_bytes_to_df(fs)
df.to_csv(csv_buffer,index=False)
s3.Object(dest_bucket.name, location +"/"+file_name.split('.')[0]+".csv").put(Body=csv_buffer.getvalue())
fs.seek(0)
fs.truncate(0)