I am using EventBridge Pipes to connect DynamoDB Streams directly to EventBridge. However, I am having difficulty transforming DynamoDB list
attributes to my desired format using the Pipes Target Input Transformer.
When I insert an item into DynamoDB, I want to publish a UserCreated
event to EventBridge without a Lambda Function. I also want to remove the DynamoDB formatting from the event payload.
My DDB Stream event payload looks like this:
{
"eventID": "c814968f8803051fa5700a2a0b9fe599",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"ApproximateCreationDateTime": 1672592664,
"Keys": {
"sk": {
"S": "PROFILE"
},
"pk": {
"S": "USER#testuser"
}
},
"NewImage": {
"username": {
"S": "testuser"
},
"roles": {
"L": [{
"S": "admin"
},{
"S": "otherrole"
}]
},
"SequenceNumber": "3044600000000017324272463",
"SizeBytes": 505,
"StreamViewType": "NEW_AND_OLD_IMAGES"
}
}
}
I want my EventBridge event payload to look like this:
{
data:{
username: "testuser"
roles: ["admin","otherrole"]
},
metadata:{
eventType: "UserCreated"
}
}
However, using the following Input Transformer
{
"data":{
"username": <$.dynamodb.NewImage.username.S>,
"roles": <$.dynamodb.NewImage.roles.L>
},
"metadata":{
"eventType":"UserCreated"
}
}
I get the following output (notice the roles
field still contains DDB formatting)
{
"data": {
"username": "testuser",
"roles": [{
"S": "admin"
},{
"S": "otherrole"
}]
},
"metadata": {
"eventType": "UserCreated"
}
}
I do not want to publish DynamoDB formatted lists in my event payloads.
If I list out each list index manually, I'm able to get close to what I want
{
"roles": [<$.dynamodb.NewImage.roles.L.0.S>,<$.dynamodb.NewImage.roles.L.1.S>]
}
Which gives me the following output:
{
"roles": ["admin","otherrole"]
}
However, the size of this list is dynamic, so this approach will not work for me.
I've also used JSONPath.com to verify the following JSONPath expression is valid. However, the Pipes Input Transformer tells me this is an invalid syntax.
$.dynamodb.NewImage.roles[*][*].S
I may be able to use a Lambda Function to format the data to my liking in the Enrichment step of my Pipe. However, introducing a Lambda Function would defeat the purpose of using Pipes for my use case.
I was able to solve this with an express state machine for the enrichment. I wrote a blog about it Solving the DynamoDB EventBridge Pipes Problem. Below is the CDK code I used:
const userCreatedEnrichment = new StateMachine(this, 'UserCreatedEnrichment', {
definition: new Map(this, 'UserCreatedEnrichmentMap', {}).iterator(
new Pass(this, 'UserCreatedEnrichmentPass', {
parameters: {
'username.$': '$.dynamodb.NewImage.username.S',
'roles.$': '$.dynamodb.NewImage.roles.L[*].S',
},
}),
),
stateMachineType: StateMachineType.EXPRESS,
});
const pipeRole = new Role(this, 'PipeRole', {
assumedBy: new ServicePrincipal('pipes.amazonaws.com'),
inlinePolicies: {
sourcePolicy: new PolicyDocument({
statements: [
new PolicyStatement({
resources: [table.tableStreamArn],
actions: ['dynamodb:DescribeStream', 'dynamodb:GetRecords', 'dynamodb:GetShardIterator', 'dynamodb:ListStreams'],
}),
],
}),
enrichmentPolicy: new PolicyDocument({
statements: [
new PolicyStatement({
resources: [userCreatedEnrichment.stateMachineArn],
actions: ['states:Start*'],
}),
],
}),
targetPolicy: new PolicyDocument({
statements: [
new PolicyStatement({
resources: [defaultEventBus.eventBusArn],
actions: ['events:PutEvents'],
}),
],
}),
},
});
new CfnPipe(this, 'UserCreatedPipe', {
description: 'Sends UserCreated events',
roleArn: pipeRole.roleArn,
source: table.tableStreamArn,
target: defaultEventBus.eventBusArn,
sourceParameters: {
dynamoDbStreamParameters: {
startingPosition: 'LATEST',
batchSize: 1,
},
},
enrichment: userCreatedEnrichment.stateMachineArn,
targetParameters: {
eventBridgeEventBusParameters: {
detailType: 'UserCreated',
source: `MySource`,
},
},
});