amazon-web-servicesaws-lambdaamazon-sqsretry-logicaws-sqs-fifo

Fifo-SQS lambda triggering failure handling


Our system uses Fifo SQS queues to drive lambdas. Here's from our SAM template:

  EventParserTriggeringQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600  # 14 Days (max)
      FifoQueue: true
      ContentBasedDeduplication: true
      VisibilityTimeout: 240  # Must be > EventParser Timeout
      Tags:
        - Key: "datadog"
          Value: "true"
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt EventParserDeadLetters.Arn
        maxReceiveCount: 1

  EventParser:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: lambdas/event_parser_lambda/
      Handler: event_parser.lambda_handler
      Timeout: 120
      Events:
        EventParserTriggeringQueueEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt EventParserTriggeringQueue.Arn
            BatchSize: 1
            ScalingConfig:
              MaximumConcurrency: 2
      Policies:
        Statement:
          - Action:
              - ssm:GetParametersByPath
              - ssm:GetParameters
              - ssm:GetParameter
            Effect: Allow
            Resource:
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/datadog/api_key"
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/sentry/dsn"
              - Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${AWS::StackName}/*"
          - Action:
              - sqs:DeleteMessage
              - sqs:GetQueueAttributes
              - sqs:ReceiveMessage
            Effect: Allow
            Resource: !GetAtt EventParserTriggeringQueue.Arn

  EventParserDeadLetters:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600  # 14 Days (max)
      FifoQueue: true
      ContentBasedDeduplication: true
      Tags:
        - Key: "datadog"
          Value: "true"
        - Key: "deadletter"
          Value: "true"

What I'm looking for is retry behavior that looks like:
Instead, the behavior we're seeing is:
First, let me check my understanding of how the system works, because it's not really documented in any one place:
One solution I have considered:

Basically, put the lambda in charge:

Is this the best I can do?

One serious issue with this approach is, lambda functions can't run longer than 15 minutes and I do worry that retrying 5 times could put us at risk.


Solution

  • This answer is python-specific but hopefully will be easy enough to translate to other implementations.

    Broadly, yes, the lambda has to take responsibility for the queue handling when there are failures.

    I wrote the following decorator, which I attach to all the entry point functions for our SQS-triggered lambdas:

    def sqs_triggered(func) -> Callable:
        """
        Decorator function for lambdas invoked by SQS messages.
    
        - Because SQS events can be batched, this decorator will invoke the function once per record.
        - If the function fails, this decorator will adjust the visibility timeout of the failed records to 0. If
          there were any successful records, it will delete them from the queue before propagating the error.
        """
    
        def decorator(event, *args, **kwargs):
            for i, record in enumerate(event["Records"]):
                try:
                    func(json.loads(record["body"]), *args, **kwargs)
                except Exception as e:
                    if "eventSourceARN" in record:
                        queue_name = record["eventSourceARN"].split(":")[-1]
                        queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
                        for successful_record in event["Records"][:i]:
                            sqs_client.delete_message(
                                QueueUrl=queue_url,
                                ReceiptHandle=successful_record["receiptHandle"],
                            )
                        for failed_record in event["Records"][i:]:
                            sqs_client.change_message_visibility(
                                QueueUrl=queue_url,
                                ReceiptHandle=failed_record["receiptHandle"],
                                VisibilityTimeout=0,
                            )
                    raise e
    
        return decorator
    
    

    Note that this means the lambda will need some extra permissions. If you're using CloudFormation, that looks like:

    Resources:
      FooBar:
        Type: AWS::Serverless::Function
        ...
        Properties:
          Policies:
            Statement:
              - Action:
                  - sqs:ChangeMessageVisibility
                  - sqs:GetQueueUrl
                Effect: Allow
                Resource: !GetAtt FooBarQueue.Arn