I am trying to run several step functions based on reading records in my sample data and invoking the step function for each piece of data in the sample data
pictorially, this is what I am doing...
Function0 will start 'n' step function routine ( see below )
This sample data is passed to Function0
Sample data
{
"data": [
"Aa",
"Bb",
"Cc",
"Dd",
"Ee",
"Ff",
"Gg",
"Hh",
"Ii",
"Jj",
"Kk",
"Ll",
"Mm",
"Nn",
"Oo",
"Pp",
"Qq",
"Rr",
"Ss",
"Tt",
"Uu",
"Vv",
"Ww",
"Xx",
"Yy",
"Zz"
]
}
Here is Function0 that invokes 'n' state functions based on how many entries there are in the sample data
Function0
import json
import boto3
def handler(event, context):
# Create a Step Functions client
client = boto3.client('stepfunctions')
# Extract the data from the event
items = event.get("data", [])
execution_arns = []
for item in items:
input_data = {
"data": item
}
# Start the execution of the Step Function
response = client.start_execution(
stateMachineArn='arn:aws:states:us-west-2:accnt:stateMachine:MyStateMachine-uPr0L9VbnFx7',
input=json.dumps(input_data)
)
# Log the response from the Step Function
print(f"Step Function execution started for item {item}: {response['executionArn']}")
# Add the execution ARN to the list
execution_arns.append(response['executionArn'])
# Return all execution ARNs as part of the response
return {
"statusCode": 200,
"executionArns": execution_arns
}
This is what is typically contained in each of my Lambdas that are called in the step function
what changes from function to function is the returned data name (eg f1_output, f2_output, f3_output, f4_output, f5_output ) other than that... what they do to the data is pretty much the same; get the data from the event... suffix the data with the function number (eg 1. 2. 3. etc) and then return the data with a status code and the structure that contains the processed data.
Typical Lambda
def handler(event, context):
data = event.get("data", [])
processed_data = [f"1.{data}"]
print(f"Function1 processed_data {processed_data}")
return {
"statusCode": 200,
"f1_output": processed_data
}
with the sample set I pass into the lambda test, I am getting:
Error
Rate Exceeded. (Service: Lambda, Status Code: 429, Request ID: 0ef3b2a7-9750-46f2-a7ad-3a43a6920627)
I have put a wait in the step function
"WaitBetween1and2": {
"Type": "Wait",
"Seconds": 2,
"Next": "Function2"
},
and a random ( 1 to 5 seconds ) sleep in lambda Function3 and Function4
but I still keep getting rate exceeded.
How can I process very quick running lambdas without getting the rate exceeded ?
Template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
SAM_GeneratorV1_5
Sample SAM Template for SAM_GeneratorV1_5
Function0:
Type: AWS::Serverless::Function
Properties:
CodeUri: experiment/function0
Handler: app.handler
Runtime: python3.9
Role: !GetAtt Function0ExecutionRole.Arn
Architectures:
- x86_64
Environment:
Variables:
DEFAULT_REGION: 'us-west-2'
Function0ExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: InvokeStepFunctionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: states:StartExecution
Resource: arn:aws:states:us-west-2:accnt:stateMachine:MyStateMachine-uPr0L9VbnFx7
Function1:
Type: AWS::Serverless::Function
Properties:
CodeUri: experiment/function1
Handler: app.handler
Runtime: python3.9
Architectures:
- x86_64
Environment:
Variables:
DEFAULT_REGION: 'us-west-2'
Function2:
Type: AWS::Serverless::Function
Properties:
CodeUri: experiment/function2
Handler: app.handler
Runtime: python3.9
Architectures:
- x86_64
Environment:
Variables:
DEFAULT_REGION: 'us-west-2'
Function3:
Type: AWS::Serverless::Function
Properties:
CodeUri: experiment/function3
Handler: app.handler
Runtime: python3.9
Architectures:
- x86_64
Environment:
Variables:
DEFAULT_REGION: 'us-west-2'
Function4:
Type: AWS::Serverless::Function
Properties:
CodeUri: experiment/function4
Handler: app.handler
Runtime: python3.9
Architectures:
- x86_64
Environment:
Variables:
DEFAULT_REGION: 'us-west-2'
Function5:
Type: AWS::Serverless::Function
Properties:
CodeUri: experiment/function5
Handler: app.handler
Runtime: python3.9
Architectures:
- x86_64
Environment:
Variables:
DEFAULT_REGION: 'us-west-2'
MyStateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
DefinitionString:
Fn::Sub: |
{
"Comment": "orchestrate multiple Lambda functions",
"StartAt": "Function1",
"States": {
"Function1": {
"Type": "Task",
"Resource": "${Function1.Arn}",
"Retry": [
{
"ErrorEquals": ["Lambda.TooManyRequestsException"],
"IntervalSeconds": 2,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"Next": "WaitBetween1and2"
},
"WaitBetween1and2": {
"Type": "Wait",
"Seconds": 2,
"Next": "Function2"
},
"Function2": {
"Type": "Task",
"Resource": "${Function2.Arn}",
"Retry": [
{
"ErrorEquals": ["Lambda.TooManyRequestsException"],
"IntervalSeconds": 2,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"Next": "WaitBeforeParallelProcessing"
},
"WaitBeforeParallelProcessing": {
"Type": "Wait",
"Seconds": 2,
"Next": "ParallelProcessing"
},
"ParallelProcessing": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Function3",
"States": {
"Function3": {
"Type": "Task",
"Resource": "${Function3.Arn}",
"Retry": [
{
"ErrorEquals": ["Lambda.TooManyRequestsException"],
"IntervalSeconds": 2,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"End": true
}
}
},
{
"StartAt": "Function4",
"States": {
"Function4": {
"Type": "Task",
"Resource": "${Function4.Arn}",
"Retry": [
{
"ErrorEquals": ["Lambda.TooManyRequestsException"],
"IntervalSeconds": 2,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"End": true
}
}
}
],
"Next": "WaitBeforeFunction5"
},
"WaitBeforeFunction5": {
"Type": "Wait",
"Seconds": 2,
"Next": "Function5"
},
"Function5": {
"Type": "Task",
"Resource": "${Function5.Arn}",
"Retry": [
{
"ErrorEquals": ["Lambda.TooManyRequestsException"],
"IntervalSeconds": 2,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"End": true
}
}
}
RoleArn:
Fn::GetAtt:
- StepFunctionRole
- Arn
StepFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: states.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: StepFunctionInvokePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource:
- !GetAtt Function1.Arn
- !GetAtt Function2.Arn
- !GetAtt Function3.Arn
- !GetAtt Function4.Arn
- !GetAtt Function5.Arn
Outputs:
StateMachineArn:
Description: "ARN of the Step Function"
Value: !Ref MyStateMachine
So the problem is that you are running many instances of your Step Function in parallel, and since they each call the same Lambdas you are ending up with a lot of simultaneous executions and hit the limit.
To avoid this, you should get rid of Function0
and instead wrap your current State Machine in a Map State which will execute the steps for each entry in your data JSON; Within the Map State definition you can then set a maximum concurrency, which will allow you to make use of the efficiency of running in parallel without going over your limits. You can then call the updated State Machine with your full data list, and it will handle the concurrency internally.
I don't work with CloudFormation directly, so I don't know how to achieve this in your configuration, but here is how you would implement something like this in CDK;
const lambda_func[n] = LambdaInvoke(...)
// Start with your current step function body
const inner_steps = lambda_func1.next(lambda_func2)
.next(lambda_func3)
.next(lambda_func4)
.next(lambda_func5)
// Wrap the steps in a Map state, each of which takes one element of the `data` list as input
const wrapped_sfn = new MapStep(
stack,
"Run Lambdas for each data element",
{
maxConcurrency: 20,
itemsPath: JsonPath.stringAt("$.data"),
itemSelector: { // Wraps each item in its own JSON with the key "data", as in function0
"data": JsonPath.stringAt("$$.Map.Item.Value")
},
resultPath: "$.processedData",
}
);
wrapped_sfn.itemProcessor(inner_steps);
Doing the equivalent of this in your CloudFormation should enable you to run all the processes as you expect while ensuring none of your Lambdas exceed their concurrency limits.