aws-lambdaamazon-dynamodbaws-cdkamazon-dynamodb-streams

Filter Criteria in Lambda Function


I want to enable DynamoDB streams on my lambda using AWS CDK which I am able to do but I also want to enable the filter criteria on lambda

But I am getting this error:

Invalid filter pattern definition. (Service: AWSLambda; Status Code: 400; Error Code: InvalidParameterValueException

This is the event I am getting from DynamoDB streams:

{
    "input": {
        "Records": [
            {
                "eventID": "e92e0072a661a06df0e62e411f",
                "eventName": "INSERT",
                "eventVersion": "1.1",
                "eventSource": "aws:dynamodb",
                "awsRegion": "<region>",
                "dynamodb": {
                    "ApproximateCreationDateTime": 1639500357,
                    "Keys": {
                        "service": {
                            "S": "service"
                        },
                        "key": {
                            "S": "key"
                        }
                    },
                    "NewImage": {
                        "service": {
                            "S": "service"
                        },
                        "channel": {
                            "S": "email"
                        },
                        "key": {
                            "S": "key"
                        }
                    },
                    "SequenceNumber": "711500000000015864417",
                    "SizeBytes": 168,
                    "StreamViewType": "NEW_IMAGE"
                },
                "eventSourceARN": "arn:aws:dynamodb:<region>:<account>:table/table-name/stream/2021-12-14T13:00:29.888"
            }
        ]
    },
    "env": {
        "lambdaContext": {
            "callbackWaitsForEmptyEventLoop": true,
            "functionVersion": "$LATEST",
            "functionName": "functionName",
            "memoryLimitInMB": "128",
            "logGroupName": "/aws/lambda/functionName",
            "logStreamName": "2021/12/14/[$LATEST]028531c7b489b8ec69bace700acc0",
            "invokedFunctionArn": "arn:aws:lambda:<region>:<account>:function:functionName",
            "awsRequestId": "c72e80252-4722-b9f0-a03b7f8b820e"
        },
        "region": "<region-name>"
    }
}

The event source mapping code is:

const mapping = new lambda.CfnEventSourceMapping(this, 'event', {
  functionName: "functionName,
  batchSize: 1,
  bisectBatchOnFunctionError: true,
  startingPosition: lambda.StartingPosition.TRIM_HORIZON,
  eventSourceArn: <stream-arn>,
  filterCriteria: filter,
});

I want to get the eventName to be INSERT and the channel to be email here. What should be the value of the filter criteria? Its not working for me


Solution

  • <Edit> CDK filter helpers added in v2.42.0

    The original workaround is no longer necessary. The CDK now has event-source filters for Lambda, Kinesis and SQS. Pass the filter to the L2 EventSourceMapping construct:

    const source: EventSourceMapping = new lambda.EventSourceMapping(this, "EventSourceMapping",{
        target: func,
        eventSourceArn: table.tableStreamArn,
        startingPosition: lambda.StartingPosition.TRIM_HORIZON,
        filters: [
          lambda.FilterCriteria.filter({
            eventName: lambda.FilterRule.isEqual("INSERT"),
            dynamodb: {  NewImage: { channel: { S: lambda.FilterRule.isEqual("email") } },},
          }),
        ],
      }
    );
    

    </Edit>


    Here's the DynamoDB streams filter Pattern syntax for new records with a channel of email:

    `{ \"eventName\": [\"INSERT\"], \"dynamodb\": { \"NewImage\": {\"channel\": { \"S\" : [\"email\"]}} } }`
    

    In other words, the Pattern is a stringified JSON filter rule with escaped quotes. The pattern is applied against each stream record.

    Here is the full CDK syntax. The code starts with the usual L2 EventSourceMapping. It then uses escape hatch syntax to set FilterCriteria on the underlying L1 CfnEventSourceMapping:

    // start with the L2 type - Note: the OP code starts with a L1 `CfnEventSourceMapping`
    const source: EventSourceMapping = new lambda.EventSourceMapping(this, 'EventSourceMapping', {
      target: func,
      eventSourceArn: table.tableStreamArn,
      startingPosition: lambda.StartingPosition.TRIM_HORIZON,
    });
    
    // escape hatch - get a L1 reference
    const cfnSouce = source.node.defaultChild as lambda.CfnEventSourceMapping;
    
    cfnSouce.addPropertyOverride('FilterCriteria', {
      Filters: [
        {
          Pattern: `{ \"eventName\": [\"INSERT\"], \"dynamodb\": { \"NewImage\": {\"channel\": { \"S\" : [\"email\"]}} } }`,
        },
      ],
    });