Our system uses Fifo SQS queues to drive lambdas. Here's from our SAM template:
EventParserTriggeringQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600 # 14 Days (max)
FifoQueue: true
ContentBasedDeduplication: true
VisibilityTimeout: 240 # Must be > EventParser Timeout
Tags:
- Key: "datadog"
Value: "true"
RedrivePolicy:
deadLetterTargetArn: !GetAtt EventParserDeadLetters.Arn
maxReceiveCount: 1
EventParser:
Type: AWS::Serverless::Function
Properties:
CodeUri: lambdas/event_parser_lambda/
Handler: event_parser.lambda_handler
Timeout: 120
Events:
EventParserTriggeringQueueEvent:
Type: SQS
Properties:
Queue: !GetAtt EventParserTriggeringQueue.Arn
BatchSize: 1
ScalingConfig:
MaximumConcurrency: 2
Policies:
Statement:
- Action:
- ssm:GetParametersByPath
- ssm:GetParameters
- ssm:GetParameter
Effect: Allow
Resource:
- Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/datadog/api_key"
- Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/sentry/dsn"
- Fn::Sub: "arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${AWS::StackName}/*"
- Action:
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ReceiveMessage
Effect: Allow
Resource: !GetAtt EventParserTriggeringQueue.Arn
EventParserDeadLetters:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600 # 14 Days (max)
FifoQueue: true
ContentBasedDeduplication: true
Tags:
- Key: "datadog"
Value: "true"
- Key: "deadletter"
Value: "true"
ReceiveMessage
on the SQS queue periodically. From our system, it looks like the default is once every 10 seconds.ReceiveMessage
calls to the queue (for the same message group ID) come back empty. (This is a Fifo SQS feature. For non-FIFO queues, only the received messages are hidden.)maxReceiveCount
, the queue gives up on the message, optionally placing it on a dead-letter queue.DeleteMessage
on the queue. This removes the head message, and also makes the next message available (i.e. it clears the visibility timeout).ReceiveMessage
until the visibility timeout has elapsedBasically, put the lambda in charge:
One serious issue with this approach is, lambda functions can't run longer than 15 minutes and I do worry that retrying 5 times could put us at risk.
This answer is python-specific but hopefully will be easy enough to translate to other implementations.
Broadly, yes, the lambda has to take responsibility for the queue handling when there are failures.
I wrote the following decorator, which I attach to all the entry point functions for our SQS-triggered lambdas:
def sqs_triggered(func) -> Callable:
"""
Decorator function for lambdas invoked by SQS messages.
- Because SQS events can be batched, this decorator will invoke the function once per record.
- If the function fails, this decorator will adjust the visibility timeout of the failed records to 0. If
there were any successful records, it will delete them from the queue before propagating the error.
"""
def decorator(event, *args, **kwargs):
for i, record in enumerate(event["Records"]):
try:
func(json.loads(record["body"]), *args, **kwargs)
except Exception as e:
if "eventSourceARN" in record:
queue_name = record["eventSourceARN"].split(":")[-1]
queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
for successful_record in event["Records"][:i]:
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=successful_record["receiptHandle"],
)
for failed_record in event["Records"][i:]:
sqs_client.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=failed_record["receiptHandle"],
VisibilityTimeout=0,
)
raise e
return decorator
Note that this means the lambda will need some extra permissions. If you're using CloudFormation, that looks like:
Resources:
FooBar:
Type: AWS::Serverless::Function
...
Properties:
Policies:
Statement:
- Action:
- sqs:ChangeMessageVisibility
- sqs:GetQueueUrl
Effect: Allow
Resource: !GetAtt FooBarQueue.Arn