Here's my lambda function:
const AWS = require('aws-sdk');
//const dynamo = new AWS.DynamoDB.DocumentClient();
//const events ="events"
exports.handler = function(event, context) {
event.Records.forEach(function(record) {
console.log('Data payload: ', record.kinesis.data);
//Kinesis data is base64 encoded so decode here
var payload = Buffer.from(record.kinesis.data, 'base64').toString('utf8');
console.log('Decoded payload:', payload);
var data = JSON.parse(payload);
console.log("Data: %j", data);
console.log("Data value: %s", data.value);
var clean = JSON.stringify(data.value).replace(/[`~!@#$%^&*()_|+\-=?;'",.<>\{\}\[\]\\\/]/gi, '')
console.log("Data clean: %s", clean);
var values = clean.split(":");
var unit = values[0];
var metric = values[1];
console.log("Data unit: %s", unit);
console.log("Data metric: %s", metric);
console.log("Writing records");
const currentTime = Date.now().toString(); // Unix time in milliseconds
const item = [
"deviceId": context.awsRequestId,
"timestamp": data.datetime,
"time": currentTime.toString(),
"type": data.topic,
"metric": metric,
"forge": "uuid",
"unit": unit
};
var params = {
TableName: events,
Item:{
"deviceId": context.awsRequestId,
"timestamp": data.datetime,
"type": data.topic,
"metric": metric,
"forge": "uuid",
"unit": unit
}
};
// Instead of saving data to dynamo. I would like to store it on timestream.
/*console.log("Saving Telemetry Data");
dynamo.put(params, function(err, data) {
if (err) {
console.error("Unable to add event. Error JSON:", JSON.stringify(err, null, 2));
context.fail();
} else {
console.log(data);
console.log("Data saved:", JSON.stringify(params, null, 2));
context.succeed();
return {"message": "Item created in DB"};
}
});
*/
});
};
How can I replicate the dynamo flow but with the timestream SDK or is there a better/easier way?
const AWS = require('aws-sdk');
const timestreamwrite = new AWS.TimestreamWrite();
exports.handler = function(event, context) {
event.Records.forEach(function(record) {
console.log('Data payload: ', record.kinesis.data);
//Kinesis data is base64 encoded so decode here
var payload = Buffer.from(record.kinesis.data, 'base64').toString('utf8');
console.log('Decoded payload:', payload);
var data = JSON.parse(payload);
console.log("Data: %j", data);
var clean = JSON.stringify(data.value).replace(/[`~!@#$%^&*()_|+\-=?;'",.<>\{\}\[\]\\\/]/gi, '')
var values = clean.split(":");
var unit = values[0];
var metric = values[1];
const currentTime = Date.now().toString(); // Unix time in milliseconds
const records = [];
records.push({
Dimensions: [{
Name: 'sensor',
Value: data.topic,
DimensionValueType: 'VARCHAR'
}],
MeasureName: unit,
MeasureValue: metric,
MeasureValueType: 'DOUBLE',
Time: currentTime.toString()
});
const params = {
DatabaseName: "greenforge-measurement",
TableName: "sensor-events",
Records: records
};
console.log("Saving Telemetry Data");
timestreamwrite.writeRecords(params, function(err, data) {
if (err) {
console.error("Unable to add event. Error JSON:", JSON.stringify(err, null, 2));
context.fail();
} else {
console.log(data);
console.log("Data saved:", JSON.stringify(params, null, 2));
context.succeed();
return {"message": "Item created in Timestream"};
}
});
});
};