I have a CloudFormation template which I am using to configure lambda to decrypt AWS RDS database activity stream logs
I used the lambda function from: https://github.com/aws-samples/optimising-aws-kms-decryption-cost-for-das/blob/main/lambda.py
However, in lambda logs, I see InvalidCipherTextException, and I do not understand why I am getting it, because everything seems to be alright.
Lambda logs from CloudWatch:
2025-05-01T18:50:01.777Z
Processing record:
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "741a3939-d892-4df5-934a-9479252cb48d",
"sequenceNumber": "49662849271037707186170471169412978102974199784156430338",
"data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiw...",
"approximateArrivalTimestamp": 1746125400.734
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::xxx:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9",
"awsRegion": "eu-central-1",
"eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx"
}
Processing record: {"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "741a3939-d892-4df5-934a-9479252cb48d", "sequenceNumber": "49662849271037707186170471169412978102974199784156430338", "data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiwiZGF0YWJhc2VBY3Rpdml0eUV2ZW50cyI6ICJBWUFEZUNTbG1tUTFOTjE1UHJmZjZXcVBpMjRBWHdBQkFCVmhkM010WTNKNWNIUnZMWEIxWW14cFl5MXJaWGtBUkVGNFEwd3JNSGhJWm5WSVNXaFZSbGhDTTNZeGNWQkVLMHBDY1dkelNIUmpWMmwwZEdwUFREZ3hNSGhEVG10clJGQXZUa00wVWpoMVRWZGFZVEZyUTFKNlVUMDlBQUVBQWtKREFCdEVZWFJoUzJWNUFBQUFnQUFBQUF3Wnkwdk8vWmVhWDRWZ3g3NEFNSlBTVVlrb1dSM0FKV1QyU2VvcUZJbWl4cHB3Y0g5VzhqdkN2bDdBOVkxNHpHdjR5Q081dnBLTUhGYllBVms4eGdJQUFBQUFEQUFFQUFBQUFBQUFBQUFBQUFBQUFBQXNUZ2lCSEhBSXNUQXpScmZFcFZDci8vLy8vd0FBQUFFQUFBQUFBQUFBQUFBQUFBRUFBQUlJeXoxWnBFLyt4OXF6WmZPUW9EYXB4U3dSRkwwbGJaQ0ppeHhkVnB0dmRodUhRNkM3Z3NJUGlBdTU4SURueGt2WHlNTDRYZTBNZnhNMUh5bWdsYmVCS2FWVDlKQkZtWVpxMTRwa3dGUTlob1dxTUVqc3cxMllwN2IzcDZyRmJIaEkyVDIzdU91TnF5cVNLTGloVVdWbE5PN0FqcWZYNjNWeDhaVXdITXNEWWRkQmFvRkJKVm9WRXpIS3RnbE5DMU9KZXlQa2VqMzIzeFFJUG5JY0RBbFRzdkxpYVlzSWtPdm4rYVRXZVdqaWNoZEdhUngrRkg0ZVJxQmp4ODRBK1VIQ21tbG45WG02QWk3eTB4UjM1N0l5YW5LWW40RUdJT2ZiN0NtSmx0YXE5OHYweUpRVUFrL0lFNzFuSkFpRC9WWkRtdUFjRUJLZkl1V0czaXNieFhIQXJ2bWFRVWdYcVdRaXRlelgvS1hlcE9KeG8zYXd0ZXJUYVVnNmtVeUtzZkpyVHRYUEhKdmd4STY0QWp6eUNXb1l4UXRnR0NZZERrK09NVXEvVmVyOUZ1Ky9IcXRuV2krRzJEMUpwU1gwb1VaTlk3bm80N09FUUE5OFE0bnRaeWNseFptRW1Qc2F2V0hIYWtXNHAyajZhbVRoNjd3NVBnR2E0T0dUZzNEdXhOYjJOeWdGNzJTKzFib1VJSWJHVXQ1ZFJYQkRTUmNodHNYQjNMZ2lRazUrOEZOc3kvYWlVa2NXTjIrRys4Ym5RS250bmlUTThISVIvbnphYTI0dmJIR3VrT0V2WE5QMFc3WmhNTDRmNEU1NHJRY1lnQjN5aWJKNGRMcnB3Nms3dGRnaTRyM3NoYWNPVS9jbmlMaVBYSEZjSkhud1g0NXNRQWkxNVFOcVZFTUN6bE82M2NKL3o0T2VyTmN1N1Frd1lNd01JbENiYkNPUEhVNEFaekJsQWpFQXhpTndDNTc5L2QrdXVHTWxhRTE1dlZiSWdDZ1hPN3Zad3czeXFQS25zMUFUNTgzejBMclVaTngvQTlwU0c3RWZBakFYbk16dkkyMVRhYU5Zem1iekRLZ2czMkJ3ajhCdWhESUhSUDYrNmU2aWViNXR0VHMzQ3B4blkvaHUwY0l0Z1VZPSIsImtleSIgOiAiQVFJREFIZ0RIVGN6V0VXejB1MFB0N0VPMkpISHhTTUVIZFd5bk1rUS9xMDZuVGpmMndGbEcvVDYxNW1TRzI0VzRobm1Fa2JnQUFBQWZqQjhCZ2txaGtpRzl3MEJCd2FnYnpCdEFnRUFNR2dHQ1NxR1NJYjNEUUVIQVRBZUJnbGdoa2dCWlFNRUFTNHdFUVFNTWU0Y0djVFhxZmVXRkJTa0FnRVFnRHY4K3ZKcUM1VTFhZEk3YlZLbkE1eGVNS3ByTHIyQ3hiRVRJUnMxaGdrTURudVl5a2gzb2JtUzdISmNFVHJQeThnMDMwNzdieWVrM1dwMXVnPT0ifQ==", "approximateArrivalTimestamp": 1746125400.734}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::618550961437:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9", "awsRegion": "eu-central-1", "eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx"}
2025-05-01T18:50:01.777Z
Base64-decoded data: b'
{
"type": "DatabaseActivityMonitoringRecords",
"version": "1.2",
"databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...",
"key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..."
}
'
Decoded record data:
{
"type": "DatabaseActivityMonitoringRecords",
"version": "1.2",
"databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...",
"key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..."
}
Decoded payload: b'\x01\x80\x03x$\xa5\x9ad54\xddy>\xb7\xdf\xe9j\x8f\x8bn\x00_\x00\x01\x00\x15aws-crypto-public-...'
Decoded data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..."
2025-05-01T18:50:01.789Z
Decrypting data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..."
Error processing record: An error occurred (InvalidCiphertextException) when calling the Decrypt operation:
Cloudformation template that I have written:
AWSTemplateFormatVersion: 2010-09-09
Description: Dev-only setup to decrypt RDS DAS logs using KMS and store them in S3
Parameters:
SourceKinesisStreamARN:
Type: String
Default: arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxxx
KMSKeyARN:
Type: String
Default: arn:aws:kms:eu-central-1:xxx:key/xxx
Region:
Type: String
Default: eu-central-1
ResourceId:
Type: String
Default: srv-rds-cluster-databasecluster-xxx # Aurora RDS Cluster ID
Resources:
# S3 Bucket to store decrypted logs
DevDecryptedLogsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: rds-das-dev-decrypted-logs # New S3 Bucket in dev account
LifecycleConfiguration:
Rules:
- Status: Enabled
Transitions:
- StorageClass: GLACIER
TransitionInDays: 30
# IAM Role for Lambda to read from Kinesis and write to S3
DevDecryptLambdaRole:
Type: AWS::IAM::Role
Properties:
PermissionsBoundary: !Sub arn:aws:iam::${AWS::AccountId}:policy/ScopePermissions
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: DevDecryptLambdaPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kinesis:GetRecords
- kinesis:GetShardIterator
- kinesis:DescribeStream
- kinesis:ListStreams
Resource: !Ref SourceKinesisStreamARN
- Effect: Allow
Action:
- s3:PutObject
Resource: arn:aws:s3:::rds-das-dev-decrypted-logs/*
- Effect: Allow
Action:
- kms:Decrypt
Resource: !Ref KMSKeyARN
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
# Lambda function to decrypt the logs and store them in S3
DevDASDecryptLambda:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Runtime: python3.10
Architectures:
- arm64
Timeout: 60
MemorySize: 512
Role: !GetAtt DevDecryptLambdaRole.Arn
Environment:
Variables:
S3_BUCKET: !Ref DevDecryptedLogsBucket # S3 bucket for decrypted logs
KMS_KEY_ID: !Ref KMSKeyARN # KMS key ID for decryption
REGION_NAME: !Ref Region # Region for decryption
RESOURCE_ID: !Ref ResourceId # RDS Aurora cluster ID
STREAM_NAME: !Ref SourceKinesisStreamARN # Kinesis stream name
Code:
ZipFile: |
import json
import zlib
import aws_encryption_sdk
import boto3
import base64
import os
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
# Constants for AWS resources and region. Please use environment variables instead
REGION_NAME = os.environ['REGION_NAME']
RESOURCE_ID = os.environ['RESOURCE_ID']
STREAM_NAME = os.environ['STREAM_NAME']
# Initialize the AWS Encryption SDK client with a specific commitment policy
enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)
# Custom key provider class for raw encryption/decryption operations
class MyRawMasterKeyProvider(RawMasterKeyProvider):
provider_id = "BC" # Custom provider ID
def __new__(cls, *args, **kwargs):
# Overriding the object creation process for proper initialization
obj = super(RawMasterKeyProvider, cls).__new__(cls)
return obj
def __init__(self, plain_key):
# Initializing the parent class and setting up a wrapping key
super().__init__()
self.wrapping_key = WrappingKey(
wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
wrapping_key=plain_key,
wrapping_key_type=EncryptionKeyType.SYMMETRIC)
def _get_raw_key(self, key_id):
# Method to retrieve the raw key; here, it returns the initialized wrapping key
return self.wrapping_key
# Class for caching decrypted data keys using AWS KMS
class KMSDataKeyCache():
def __init__(self, session):
# Initialize the KMS client and a simple dictionary for caching keys
self.kms_client = session.client('kms', region_name=REGION_NAME)
self.key_cache = {}
def getDecrypted(self, data_key_decoded):
# Attempt to retrieve the decrypted key from cache or decrypt it using KMS
if data_key_decoded in self.key_cache:
print("Cache hit for data key.")
return self.key_cache[data_key_decoded]
else:
print(f"Decrypting data key: {data_key_decoded}")
# Decrypt the key using KMS and store it in the cache
data_key_decrypt_result = self.kms_client.decrypt(
CiphertextBlob=data_key_decoded,
EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
self.key_cache[data_key_decoded] = data_key_decrypt_result['Plaintext']
print(f"Data key decrypted successfully.")
return data_key_decrypt_result['Plaintext']
# Function to decrypt payload with a provided data key
def decrypt_payload(payload, data_key):
print(f"Decrypting payload with data key: {data_key}") # Debugging: Print the full data key
# Setup the key provider and decrypt the payload
my_key_provider = MyRawMasterKeyProvider(data_key)
my_key_provider.add_master_key("DataKey")
decrypted_plaintext, header = enc_client.decrypt(
source=payload,
materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(
master_key_provider=my_key_provider))
print(f"Decrypted payload: {decrypted_plaintext}") # Debugging: Print the full decrypted payload
return decrypted_plaintext
# Function to decrypt and then decompress the payload
def decrypt_decompress(payload, key):
print(f"Decompressing payload...") # Debugging: Indicate decompression
decrypted = decrypt_payload(payload, key)
decompressed = zlib.decompress(decrypted, zlib.MAX_WBITS + 16)
print(f"Decompressed payload: {decompressed}") # Debugging: Print the full decompressed payload
return decompressed
# The main Lambda handler function
def lambda_handler(event, context):
# Initialize a session and the KMS data key cache
print("Initializing KMS Data Key Cache...")
session = boto3.session.Session()
kms_data_key_cache = KMSDataKeyCache(session)
# Process each record in the event
for record in event['Records']:
try:
print(f"Processing record: {json.dumps(record)}") # Debugging: Print the full record
# Decode and parse the incoming data
data = base64.b64decode(record['kinesis']['data'])
print(f"Base64-decoded data: {data}") # Debugging: Print the full base64-decoded data
record_data = json.loads(data)
print(f"Decoded record data: {json.dumps(record_data)}") # Debugging: Print the full decoded record data
payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
print(f"Decoded payload: {payload_decoded}") # Debugging: Print the full decoded payload
data_key_decoded = base64.b64decode(record_data['key'])
print(f"Decoded data key: {data_key_decoded}") # Debugging: Print the full decoded data key
# Get the decrypted data key from the cache or KMS
decrypted_data_key = kms_data_key_cache.getDecrypted(data_key_decoded)
print(f"Decrypted data key: {decrypted_data_key}") # Debugging: Print the full decrypted data key
# Decrypt and decompress the payload
decrypted_decompressed_payload = decrypt_decompress(payload_decoded, decrypted_data_key)
plaintext = decrypted_decompressed_payload.decode('utf-8')
print(f"Decrypted and decompressed payload: {plaintext}") # Debugging: Print the full plaintext
# Load the JSON events and log them
events = json.loads(plaintext)
print("Processed events:", events) # Debugging: Print the full events processed
except Exception as e:
# Log any errors encountered during processing
print(f"Error processing record: {str(e)}")
# Return a success status code and message
return {
'statusCode': 200,
'body': json.dumps('Processing Complete')
}
# Event source mapping to trigger the Lambda function from Kinesis stream
DevDecryptLambdaEventSource:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 100
EventSourceArn: !Ref SourceKinesisStreamARN
FunctionName: !Ref DevDASDecryptLambda
StartingPosition: LATEST
Outputs:
DevDecryptedLogsBucketName:
Description: The S3 bucket where decrypted logs will be stored
Value: rds-das-dev-decrypted-logs
Fixed! The database cluster has Resource ID in the format cluster-xxxxxxx. You need to specify this in the encryption context. Decryption works now