pythonaws-lambdaamazon-rdsamazon-kinesisamazon-kms

AWS Lambda: Unable to decrypt RDS Activity Stream using KMS (InvalidCiphertextException)


I have a CloudFormation template which I am using to configure lambda to decrypt AWS RDS database activity stream logs

I used the lambda function from: https://github.com/aws-samples/optimising-aws-kms-decryption-cost-for-das/blob/main/lambda.py

However, in lambda logs, I see InvalidCipherTextException, and I do not understand why I am getting it, because everything seems to be alright.

Lambda logs from CloudWatch:

2025-05-01T18:50:01.777Z
Processing record: 
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "741a3939-d892-4df5-934a-9479252cb48d",
        "sequenceNumber": "49662849271037707186170471169412978102974199784156430338",
        "data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiw...",
        "approximateArrivalTimestamp": 1746125400.734
    },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::xxx:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9",
    "awsRegion": "eu-central-1",
    "eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx"
}


Processing record: {"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "741a3939-d892-4df5-934a-9479252cb48d", "sequenceNumber": "49662849271037707186170471169412978102974199784156430338", "data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiwiZGF0YWJhc2VBY3Rpdml0eUV2ZW50cyI6ICJBWUFEZUNTbG1tUTFOTjE1UHJmZjZXcVBpMjRBWHdBQkFCVmhkM010WTNKNWNIUnZMWEIxWW14cFl5MXJaWGtBUkVGNFEwd3JNSGhJWm5WSVNXaFZSbGhDTTNZeGNWQkVLMHBDY1dkelNIUmpWMmwwZEdwUFREZ3hNSGhEVG10clJGQXZUa00wVWpoMVRWZGFZVEZyUTFKNlVUMDlBQUVBQWtKREFCdEVZWFJoUzJWNUFBQUFnQUFBQUF3Wnkwdk8vWmVhWDRWZ3g3NEFNSlBTVVlrb1dSM0FKV1QyU2VvcUZJbWl4cHB3Y0g5VzhqdkN2bDdBOVkxNHpHdjR5Q081dnBLTUhGYllBVms4eGdJQUFBQUFEQUFFQUFBQUFBQUFBQUFBQUFBQUFBQXNUZ2lCSEhBSXNUQXpScmZFcFZDci8vLy8vd0FBQUFFQUFBQUFBQUFBQUFBQUFBRUFBQUlJeXoxWnBFLyt4OXF6WmZPUW9EYXB4U3dSRkwwbGJaQ0ppeHhkVnB0dmRodUhRNkM3Z3NJUGlBdTU4SURueGt2WHlNTDRYZTBNZnhNMUh5bWdsYmVCS2FWVDlKQkZtWVpxMTRwa3dGUTlob1dxTUVqc3cxMllwN2IzcDZyRmJIaEkyVDIzdU91TnF5cVNLTGloVVdWbE5PN0FqcWZYNjNWeDhaVXdITXNEWWRkQmFvRkJKVm9WRXpIS3RnbE5DMU9KZXlQa2VqMzIzeFFJUG5JY0RBbFRzdkxpYVlzSWtPdm4rYVRXZVdqaWNoZEdhUngrRkg0ZVJxQmp4ODRBK1VIQ21tbG45WG02QWk3eTB4UjM1N0l5YW5LWW40RUdJT2ZiN0NtSmx0YXE5OHYweUpRVUFrL0lFNzFuSkFpRC9WWkRtdUFjRUJLZkl1V0czaXNieFhIQXJ2bWFRVWdYcVdRaXRlelgvS1hlcE9KeG8zYXd0ZXJUYVVnNmtVeUtzZkpyVHRYUEhKdmd4STY0QWp6eUNXb1l4UXRnR0NZZERrK09NVXEvVmVyOUZ1Ky9IcXRuV2krRzJEMUpwU1gwb1VaTlk3bm80N09FUUE5OFE0bnRaeWNseFptRW1Qc2F2V0hIYWtXNHAyajZhbVRoNjd3NVBnR2E0T0dUZzNEdXhOYjJOeWdGNzJTKzFib1VJSWJHVXQ1ZFJYQkRTUmNodHNYQjNMZ2lRazUrOEZOc3kvYWlVa2NXTjIrRys4Ym5RS250bmlUTThISVIvbnphYTI0dmJIR3VrT0V2WE5QMFc3WmhNTDRmNEU1NHJRY1lnQjN5aWJKNGRMcnB3Nms3dGRnaTRyM3NoYWNPVS9jbmlMaVBYSEZjSkhud1g0NXNRQWkxNVFOcVZFTUN6bE82M2NKL3o0T2VyTmN1N1Frd1lNd01JbENiYkNPUEhVNEFaekJsQWpFQXhpTndDNTc5L2QrdXVHTWxhRTE1dlZiSWdDZ1hPN3Zad3czeXFQS25zMUFUNTgzejBMclVaTngvQTlwU0c3RWZBakFYbk16dkkyMVRhYU5Zem1iekRLZ2czMkJ3ajhCdWhESUhSUDYrNmU2aWViNXR0VHMzQ3B4blkvaHUwY0l0Z1VZPSIsImtleSIgOiAiQVFJREFIZ0RIVGN6V0VXejB1MFB0N0VPMkpISHhTTUVIZFd5bk1rUS9xMDZuVGpmMndGbEcvVDYxNW1TRzI0VzRobm1Fa2JnQUFBQWZqQjhCZ2txaGtpRzl3MEJCd2FnYnpCdEFnRUFNR2dHQ1NxR1NJYjNEUUVIQVRBZUJnbGdoa2dCWlFNRUFTNHdFUVFNTWU0Y0djVFhxZmVXRkJTa0FnRVFnRHY4K3ZKcUM1VTFhZEk3YlZLbkE1eGVNS3ByTHIyQ3hiRVRJUnMxaGdrTURudVl5a2gzb2JtUzdISmNFVHJQeThnMDMwNzdieWVrM1dwMXVnPT0ifQ==", "approximateArrivalTimestamp": 1746125400.734}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::618550961437:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9", "awsRegion": "eu-central-1", "eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx"}
2025-05-01T18:50:01.777Z
Base64-decoded data: b'
{
    "type": "DatabaseActivityMonitoringRecords",
    "version": "1.2",
    "databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...",
    "key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..."
}
'



Decoded record data: 
{
    "type": "DatabaseActivityMonitoringRecords",
    "version": "1.2",
    "databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...",
    "key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..."
}



Decoded payload: b'\x01\x80\x03x$\xa5\x9ad54\xddy>\xb7\xdf\xe9j\x8f\x8bn\x00_\x00\x01\x00\x15aws-crypto-public-...'


Decoded data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..."
2025-05-01T18:50:01.789Z

Decrypting data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..."


Error processing record: An error occurred (InvalidCiphertextException) when calling the Decrypt operation: 

Cloudformation template that I have written:

AWSTemplateFormatVersion: 2010-09-09
Description: Dev-only setup to decrypt RDS DAS logs using KMS and store them in S3

Parameters:
  SourceKinesisStreamARN:
    Type: String
    Default: arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxxx
  KMSKeyARN:
    Type: String
    Default: arn:aws:kms:eu-central-1:xxx:key/xxx
  Region:
    Type: String
    Default: eu-central-1
  ResourceId:
    Type: String
    Default: srv-rds-cluster-databasecluster-xxx # Aurora RDS Cluster ID

Resources:
  # S3 Bucket to store decrypted logs
  DevDecryptedLogsBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: rds-das-dev-decrypted-logs # New S3 Bucket in dev account
      LifecycleConfiguration:
        Rules:
          - Status: Enabled
            Transitions:
              - StorageClass: GLACIER
                TransitionInDays: 30

  # IAM Role for Lambda to read from Kinesis and write to S3
  DevDecryptLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      PermissionsBoundary: !Sub arn:aws:iam::${AWS::AccountId}:policy/ScopePermissions
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: DevDecryptLambdaPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:GetRecords
                  - kinesis:GetShardIterator
                  - kinesis:DescribeStream
                  - kinesis:ListStreams
                Resource: !Ref SourceKinesisStreamARN
              - Effect: Allow
                Action:
                  - s3:PutObject
                Resource: arn:aws:s3:::rds-das-dev-decrypted-logs/*
              - Effect: Allow
                Action:
                  - kms:Decrypt
                Resource: !Ref KMSKeyARN
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'

  # Lambda function to decrypt the logs and store them in S3
  DevDASDecryptLambda:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Runtime: python3.10
      Architectures:
        - arm64
      Timeout: 60
      MemorySize: 512
      Role: !GetAtt DevDecryptLambdaRole.Arn
      Environment:
        Variables:
          S3_BUCKET: !Ref DevDecryptedLogsBucket # S3 bucket for decrypted logs
          KMS_KEY_ID: !Ref KMSKeyARN # KMS key ID for decryption
          REGION_NAME: !Ref Region # Region for decryption
          RESOURCE_ID: !Ref ResourceId # RDS Aurora cluster ID
          STREAM_NAME: !Ref SourceKinesisStreamARN # Kinesis stream name
      Code:
        ZipFile: |
          import json
          import zlib
          import aws_encryption_sdk
          import boto3
          import base64
          import os
          from aws_encryption_sdk import CommitmentPolicy
          from aws_encryption_sdk.internal.crypto import WrappingKey
          from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
          from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType

          # Constants for AWS resources and region. Please use environment variables instead
          REGION_NAME = os.environ['REGION_NAME']
          RESOURCE_ID = os.environ['RESOURCE_ID']
          STREAM_NAME = os.environ['STREAM_NAME']

          # Initialize the AWS Encryption SDK client with a specific commitment policy
          enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)

          # Custom key provider class for raw encryption/decryption operations
          class MyRawMasterKeyProvider(RawMasterKeyProvider):
              provider_id = "BC"  # Custom provider ID

              def __new__(cls, *args, **kwargs):
                  # Overriding the object creation process for proper initialization
                  obj = super(RawMasterKeyProvider, cls).__new__(cls)
                  return obj

              def __init__(self, plain_key):
                  # Initializing the parent class and setting up a wrapping key
                  super().__init__()
                  self.wrapping_key = WrappingKey(
                      wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                      wrapping_key=plain_key,
                      wrapping_key_type=EncryptionKeyType.SYMMETRIC)

              def _get_raw_key(self, key_id):
                  # Method to retrieve the raw key; here, it returns the initialized wrapping key
                  return self.wrapping_key

          # Class for caching decrypted data keys using AWS KMS
          class KMSDataKeyCache():
              def __init__(self, session):
                  # Initialize the KMS client and a simple dictionary for caching keys
                  self.kms_client = session.client('kms', region_name=REGION_NAME)
                  self.key_cache = {}

              def getDecrypted(self, data_key_decoded):
                  # Attempt to retrieve the decrypted key from cache or decrypt it using KMS
                  if data_key_decoded in self.key_cache:
                      print("Cache hit for data key.")
                      return self.key_cache[data_key_decoded]
                  else:
                      print(f"Decrypting data key: {data_key_decoded}")
                      # Decrypt the key using KMS and store it in the cache
                      data_key_decrypt_result = self.kms_client.decrypt(
                          CiphertextBlob=data_key_decoded,
                          EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
                      self.key_cache[data_key_decoded] = data_key_decrypt_result['Plaintext']
                      print(f"Data key decrypted successfully.")
                      return data_key_decrypt_result['Plaintext']

          # Function to decrypt payload with a provided data key
          def decrypt_payload(payload, data_key):
              print(f"Decrypting payload with data key: {data_key}")  # Debugging: Print the full data key
              # Setup the key provider and decrypt the payload
              my_key_provider = MyRawMasterKeyProvider(data_key)
              my_key_provider.add_master_key("DataKey")
              decrypted_plaintext, header = enc_client.decrypt(
                  source=payload,
                  materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(
                      master_key_provider=my_key_provider))
              print(f"Decrypted payload: {decrypted_plaintext}")  # Debugging: Print the full decrypted payload
              return decrypted_plaintext

          # Function to decrypt and then decompress the payload
          def decrypt_decompress(payload, key):
              print(f"Decompressing payload...")  # Debugging: Indicate decompression
              decrypted = decrypt_payload(payload, key)
              decompressed = zlib.decompress(decrypted, zlib.MAX_WBITS + 16)
              print(f"Decompressed payload: {decompressed}")  # Debugging: Print the full decompressed payload
              return decompressed

          # The main Lambda handler function
          def lambda_handler(event, context):
              # Initialize a session and the KMS data key cache
              print("Initializing KMS Data Key Cache...")
              session = boto3.session.Session()
              kms_data_key_cache = KMSDataKeyCache(session)

              # Process each record in the event
              for record in event['Records']:
                  try:
                      print(f"Processing record: {json.dumps(record)}")  # Debugging: Print the full record
                      # Decode and parse the incoming data
                      data = base64.b64decode(record['kinesis']['data'])
                      print(f"Base64-decoded data: {data}")  # Debugging: Print the full base64-decoded data
                      record_data = json.loads(data)
                      print(f"Decoded record data: {json.dumps(record_data)}")  # Debugging: Print the full decoded record data
                      payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
                      print(f"Decoded payload: {payload_decoded}")  # Debugging: Print the full decoded payload
                      data_key_decoded = base64.b64decode(record_data['key'])
                      print(f"Decoded data key: {data_key_decoded}")  # Debugging: Print the full decoded data key

                      # Get the decrypted data key from the cache or KMS
                      decrypted_data_key = kms_data_key_cache.getDecrypted(data_key_decoded)
                      print(f"Decrypted data key: {decrypted_data_key}")  # Debugging: Print the full decrypted data key

                      # Decrypt and decompress the payload
                      decrypted_decompressed_payload = decrypt_decompress(payload_decoded, decrypted_data_key)
                      plaintext = decrypted_decompressed_payload.decode('utf-8')
                      print(f"Decrypted and decompressed payload: {plaintext}")  # Debugging: Print the full plaintext

                      # Load the JSON events and log them
                      events = json.loads(plaintext)
                      print("Processed events:", events)  # Debugging: Print the full events processed

                  except Exception as e:
                      # Log any errors encountered during processing
                      print(f"Error processing record: {str(e)}")

              # Return a success status code and message
              return {
                  'statusCode': 200,
                  'body': json.dumps('Processing Complete')
              }

  # Event source mapping to trigger the Lambda function from Kinesis stream
  DevDecryptLambdaEventSource:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 100
      EventSourceArn: !Ref SourceKinesisStreamARN
      FunctionName: !Ref DevDASDecryptLambda
      StartingPosition: LATEST

Outputs:
  DevDecryptedLogsBucketName:
    Description: The S3 bucket where decrypted logs will be stored
    Value: rds-das-dev-decrypted-logs

Solution

  • Fixed! The database cluster has Resource ID in the format cluster-xxxxxxx. You need to specify this in the encryption context. Decryption works now