I want to enable DynamoDB streams on my lambda using AWS CDK which I am able to do but I also want to enable the filter criteria on lambda
But I am getting this error:
Invalid filter pattern definition. (Service: AWSLambda; Status Code: 400; Error Code: InvalidParameterValueException
This is the event I am getting from DynamoDB streams:
{
"input": {
"Records": [
{
"eventID": "e92e0072a661a06df0e62e411f",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "<region>",
"dynamodb": {
"ApproximateCreationDateTime": 1639500357,
"Keys": {
"service": {
"S": "service"
},
"key": {
"S": "key"
}
},
"NewImage": {
"service": {
"S": "service"
},
"channel": {
"S": "email"
},
"key": {
"S": "key"
}
},
"SequenceNumber": "711500000000015864417",
"SizeBytes": 168,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:<region>:<account>:table/table-name/stream/2021-12-14T13:00:29.888"
}
]
},
"env": {
"lambdaContext": {
"callbackWaitsForEmptyEventLoop": true,
"functionVersion": "$LATEST",
"functionName": "functionName",
"memoryLimitInMB": "128",
"logGroupName": "/aws/lambda/functionName",
"logStreamName": "2021/12/14/[$LATEST]028531c7b489b8ec69bace700acc0",
"invokedFunctionArn": "arn:aws:lambda:<region>:<account>:function:functionName",
"awsRequestId": "c72e80252-4722-b9f0-a03b7f8b820e"
},
"region": "<region-name>"
}
}
The event source mapping code is:
const mapping = new lambda.CfnEventSourceMapping(this, 'event', {
functionName: "functionName,
batchSize: 1,
bisectBatchOnFunctionError: true,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
eventSourceArn: <stream-arn>,
filterCriteria: filter,
});
I want to get the eventName to be INSERT
and the channel to be email here. What should be the value of the filter criteria? Its not working for me
The original workaround is no longer necessary. The CDK now has event-source filters for Lambda, Kinesis and SQS. Pass the filter to the L2 EventSourceMapping
construct:
const source: EventSourceMapping = new lambda.EventSourceMapping(this, "EventSourceMapping",{
target: func,
eventSourceArn: table.tableStreamArn,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
filters: [
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual("INSERT"),
dynamodb: { NewImage: { channel: { S: lambda.FilterRule.isEqual("email") } },},
}),
],
}
);
Here's the DynamoDB streams filter Pattern
syntax for new records with a channel
of email
:
`{ \"eventName\": [\"INSERT\"], \"dynamodb\": { \"NewImage\": {\"channel\": { \"S\" : [\"email\"]}} } }`
In other words, the Pattern
is a stringified JSON filter rule with escaped quotes. The pattern is applied against each stream record.
Here is the full CDK syntax. The code starts with the usual L2 EventSourceMapping
. It then uses escape hatch syntax to set FilterCriteria
on the underlying L1 CfnEventSourceMapping
:
// start with the L2 type - Note: the OP code starts with a L1 `CfnEventSourceMapping`
const source: EventSourceMapping = new lambda.EventSourceMapping(this, 'EventSourceMapping', {
target: func,
eventSourceArn: table.tableStreamArn,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
});
// escape hatch - get a L1 reference
const cfnSouce = source.node.defaultChild as lambda.CfnEventSourceMapping;
cfnSouce.addPropertyOverride('FilterCriteria', {
Filters: [
{
Pattern: `{ \"eventName\": [\"INSERT\"], \"dynamodb\": { \"NewImage\": {\"channel\": { \"S\" : [\"email\"]}} } }`,
},
],
});