I am trying to write a lambda function that can convert stream a huge csv file to multiple small json files (say a json file for 2000 rows) from and to a s3 bucket. I though have some restrictions like running in a limited RAM memory of 256 MB.
I am able to do the same by getting the file as file instead of stream like below.
But due to memory constraints i need to handle this in streams. Is there a way to do the same using streams?
// transformationClass.js
const csv = require('csvtojson');
const extension = '.json';
class S3CsvToJson {
static async perform(input, output, headers) {
let jsonArray = null;
const s3Object = await s3.getObject(); // getting the s3 object
const csvString = s3Object.Body.toString('utf8');
await csv({
noheader: false,
})
.fromString(csvString)
.then((csvRow) => {
jsonArray = csvRow;
});
const fileNames = await S3CsvToJson.writeToFile(jsonArray, output);
return { files: fileNames };
}
static async writeToFile(jsonArray, output) {
const minNumber = 0;
const maxNumber = 1999;
const fileNames = [];
let outFile;
if (jsonArray && Array.isArray(jsonArray)) {
let fileIterator = 1;
while (jsonArray.length) {
outFile = `${output.key}-${fileIterator}${extension}`;
await // s3.putObject(). writing to s3
.putObject(
outFile,
output.bucketName,
JSON.stringify(jsonArray.splice(minNumber, maxNumber)),
);
console.log('rows left :', jsonArray.length);
fileNames.push(outFile);
fileIterator += 1;
}
}
return fileNames;
}
}
module.exports = S3CsvToJson;
here is the handler function
// handler.js
module.exports.perform = async (event, context, callback) => {
context.callbackWaitsForEmptyEventLoop = false;
await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
.then((result) => callback(null, result));
console.log('leaving - ', Date.now());
};
Thanks in advance!!
After going through a lot of stuffs, I finally derived a way to get that done.
What I had to do was, wrap the whole process into a promise and return it. I created a read stream from the s3, forwarded it to the parser, and then to the write stream. I wished to share it here, so it could be useful for others. Also open for any better optimized solutions.
// transformationClass.js
const csv = require('fast-csv');
const { Transform, pipeline } = require('stream');
const extension = '.json';
class S3CsvToJson {
static async perform(input, output, headers) {
console.log(input, output, headers);
const threshold = 2000;
try {
const promiseTransformData = () => new Promise((resolve, reject) => {
try {
let jsonOutput = [];
let fileCounter = 0;
const fileNames = [];
const writableStream = new Transform({
objectMode: true,
autoDestroy: true,
async transform(data, _, next) {
if (jsonOutput.length === threshold) {
fileCounter += 1;
const fileUpload = new Promise((resolveWriter) => {
s3
.putObject(
`${output.key}-${fileCounter}${extension}`,
output.bucketName,
JSON.stringify(jsonOutput),
)
.then(() => resolveWriter());
});
await fileUpload;
fileNames.push(`${output.key}-${fileCounter}${extension}`);
jsonOutput = [];
}
jsonOutput.push(data);
next();
},
});
const readFileStream = s3.getReadStream(input.key, input.bucketName);
pipeline(
readFileStream,
csv.parse({ headers: true }),
writableStream,
(error) => {
// if (err) throw new Error('Pipeline error');
if (error) {
console.error(`Error occurred in pipeline - ${error}`);
resolve({ errorMessage: error.message });
}
},
);
writableStream.on('finish', async () => {
if (jsonOutput.length) {
fileCounter += 1;
const fileUpload = new Promise((resolveWriter) => {
s3
.putObject(
`${output.key}-${fileCounter}${extension}`,
output.bucketName,
JSON.stringify(jsonOutput),
)
.then(() => resolveWriter());
});
await fileUpload;
fileNames.push(`${output.key}-${fileCounter}${extension}`);
jsonOutput = [];
}
console.log({ status: 'Success', files: fileNames });
resolve({ status: 'Success', files: fileNames });
});
} catch (error) {
console.error(`Error occurred while transformation - ${error}`);
resolve({ errorMessage: error ? error.message : null });
}
});
return await promiseTransformData();
} catch (error) {
return error.message || error;
}
}
}
module.exports = S3CsvToJson;
And for the handler i call the S3CsvToJson like this
// handler.js
module.exports.perform = async (event, context, callback) => {
context.callbackWaitsForEmptyEventLoop = false;
await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
.then((result) => callback(null, result))
.catch((error) => callback(error));
};
Hope it was helpful. Thanks!