amazon-web-servicesaws-lambdaaws-step-functions

How can get my AWS step function and lambdas to run without exceeding rate?


I am trying to run several step functions based on reading records in my sample data and invoking the step function for each piece of data in the sample data

pictorially, this is what I am doing...

Function0 will start 'n' step function routine ( see below )

enter image description here

This sample data is passed to Function0

Sample data

{
    "data": [
        "Aa",
        "Bb",
        "Cc",
        "Dd",
        "Ee",
        "Ff",
        "Gg",
        "Hh",
        "Ii",
        "Jj",
        "Kk",
        "Ll",
        "Mm",
        "Nn",
        "Oo",
        "Pp",
        "Qq",
        "Rr",
        "Ss",
        "Tt",
        "Uu",
        "Vv",
        "Ww",
        "Xx",
        "Yy",
        "Zz"
    ]
}

Here is Function0 that invokes 'n' state functions based on how many entries there are in the sample data

Function0

import json
import boto3

def handler(event, context):
    # Create a Step Functions client
    client = boto3.client('stepfunctions')

    # Extract the data from the event
    items = event.get("data", [])

    execution_arns = []
    for item in items:
        input_data = {
            "data": item 
        }

        # Start the execution of the Step Function
        response = client.start_execution(
            stateMachineArn='arn:aws:states:us-west-2:accnt:stateMachine:MyStateMachine-uPr0L9VbnFx7',
            input=json.dumps(input_data)
        )

        # Log the response from the Step Function
        print(f"Step Function execution started for item {item}: {response['executionArn']}")

        # Add the execution ARN to the list
        execution_arns.append(response['executionArn'])

    # Return all execution ARNs as part of the response
    return {
        "statusCode": 200,
        "executionArns": execution_arns
    }

This is what is typically contained in each of my Lambdas that are called in the step function

what changes from function to function is the returned data name (eg f1_output, f2_output, f3_output, f4_output, f5_output ) other than that... what they do to the data is pretty much the same; get the data from the event... suffix the data with the function number (eg 1. 2. 3. etc) and then return the data with a status code and the structure that contains the processed data.

Typical Lambda

def handler(event, context):
    data = event.get("data", [])

    processed_data = [f"1.{data}"]

    print(f"Function1 processed_data {processed_data}")

    return {
        "statusCode": 200,
        "f1_output": processed_data
    }

with the sample set I pass into the lambda test, I am getting:

Error

Rate Exceeded. (Service: Lambda, Status Code: 429, Request ID: 0ef3b2a7-9750-46f2-a7ad-3a43a6920627)

I have put a wait in the step function

              "WaitBetween1and2": {
                "Type": "Wait",
                "Seconds": 2,
                "Next": "Function2"
              },

and a random ( 1 to 5 seconds ) sleep in lambda Function3 and Function4

but I still keep getting rate exceeded.

How can I process very quick running lambdas without getting the rate exceeded ?

Template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  SAM_GeneratorV1_5

  Sample SAM Template for SAM_GeneratorV1_5


  Function0:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: experiment/function0
      Handler: app.handler
      Runtime: python3.9
      Role: !GetAtt Function0ExecutionRole.Arn
      Architectures:
        - x86_64
      Environment:
        Variables:
          DEFAULT_REGION: 'us-west-2'

  Function0ExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: InvokeStepFunctionPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: states:StartExecution
                Resource: arn:aws:states:us-west-2:accnt:stateMachine:MyStateMachine-uPr0L9VbnFx7

  Function1:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: experiment/function1
      Handler: app.handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Environment:
        Variables:
          DEFAULT_REGION: 'us-west-2' 

  Function2:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: experiment/function2
      Handler: app.handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Environment:
        Variables:
          DEFAULT_REGION: 'us-west-2' 

  Function3:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: experiment/function3
      Handler: app.handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Environment:
        Variables:
          DEFAULT_REGION: 'us-west-2' 

  Function4:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: experiment/function4
      Handler: app.handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Environment:
        Variables:
          DEFAULT_REGION: 'us-west-2' 

  Function5:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: experiment/function5
      Handler: app.handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Environment:
        Variables:
          DEFAULT_REGION: 'us-west-2' 

  MyStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      DefinitionString:
        Fn::Sub: |
          {
            "Comment": "orchestrate multiple Lambda functions",
            "StartAt": "Function1",
            "States": {
              "Function1": {
                "Type": "Task",
                "Resource": "${Function1.Arn}",
                "Retry": [
                  {
                    "ErrorEquals": ["Lambda.TooManyRequestsException"],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 5,
                    "BackoffRate": 2.0
                  }
                ],
                "Next": "WaitBetween1and2"
              },
              "WaitBetween1and2": {
                "Type": "Wait",
                "Seconds": 2,
                "Next": "Function2"
              },
              "Function2": {
                "Type": "Task",
                "Resource": "${Function2.Arn}",
                "Retry": [
                  {
                    "ErrorEquals": ["Lambda.TooManyRequestsException"],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 5,
                    "BackoffRate": 2.0
                  }
                ],
                "Next": "WaitBeforeParallelProcessing"
              },
              "WaitBeforeParallelProcessing": {
                "Type": "Wait",
                "Seconds": 2,
                "Next": "ParallelProcessing"
              },
              "ParallelProcessing": {
                "Type": "Parallel",
                "Branches": [
                  {
                    "StartAt": "Function3",
                    "States": {
                      "Function3": {
                        "Type": "Task",
                        "Resource": "${Function3.Arn}",
                        "Retry": [
                          {
                            "ErrorEquals": ["Lambda.TooManyRequestsException"],
                            "IntervalSeconds": 2,
                            "MaxAttempts": 5,
                            "BackoffRate": 2.0
                          }
                        ],
                        "End": true
                      }
                    }
                  },
                  {
                    "StartAt": "Function4",
                    "States": {
                      "Function4": {
                        "Type": "Task",
                        "Resource": "${Function4.Arn}",
                        "Retry": [
                          {
                            "ErrorEquals": ["Lambda.TooManyRequestsException"],
                            "IntervalSeconds": 2,
                            "MaxAttempts": 5,
                            "BackoffRate": 2.0
                          }
                        ],
                        "End": true
                      }
                    }
                  }
                ],
                "Next": "WaitBeforeFunction5"
              },
              "WaitBeforeFunction5": {
                "Type": "Wait",
                "Seconds": 2,
                "Next": "Function5"
              },
              "Function5": {
                "Type": "Task",
                "Resource": "${Function5.Arn}",
                "Retry": [
                  {
                    "ErrorEquals": ["Lambda.TooManyRequestsException"],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 5,
                    "BackoffRate": 2.0
                  }
                ],
                "End": true
              }
            }
          }
      RoleArn:
        Fn::GetAtt:
          - StepFunctionRole
          - Arn

  StepFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: states.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: StepFunctionInvokePolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                Resource:
                  - !GetAtt Function1.Arn
                  - !GetAtt Function2.Arn
                  - !GetAtt Function3.Arn
                  - !GetAtt Function4.Arn
                  - !GetAtt Function5.Arn

Outputs:
  StateMachineArn:
    Description: "ARN of the Step Function"
    Value: !Ref MyStateMachine

Solution

  • So the problem is that you are running many instances of your Step Function in parallel, and since they each call the same Lambdas you are ending up with a lot of simultaneous executions and hit the limit.

    To avoid this, you should get rid of Function0 and instead wrap your current State Machine in a Map State which will execute the steps for each entry in your data JSON; Within the Map State definition you can then set a maximum concurrency, which will allow you to make use of the efficiency of running in parallel without going over your limits. You can then call the updated State Machine with your full data list, and it will handle the concurrency internally.

    I don't work with CloudFormation directly, so I don't know how to achieve this in your configuration, but here is how you would implement something like this in CDK;

    const lambda_func[n] = LambdaInvoke(...)
    
    // Start with your current step function body
    const inner_steps = lambda_func1.next(lambda_func2)
                            .next(lambda_func3)
                            .next(lambda_func4)
                            .next(lambda_func5)
    
    // Wrap the steps in a Map state, each of which takes one element of the `data` list as input
    const wrapped_sfn = new MapStep(
        stack,
        "Run Lambdas for each data element",
        {
            maxConcurrency: 20,
            itemsPath: JsonPath.stringAt("$.data"),
            itemSelector: {  // Wraps each item in its own JSON with the key "data", as in function0
                "data": JsonPath.stringAt("$$.Map.Item.Value")
            },
            resultPath: "$.processedData",
        }
    );
    wrapped_sfn.itemProcessor(inner_steps);
    

    Doing the equivalent of this in your CloudFormation should enable you to run all the processes as you expect while ensuring none of your Lambdas exceed their concurrency limits.