amazon-dynamodbschemaflink-streamingflink-sqlpyflink

Flink DynamoDB Sink: "The provided key element does not match the schema"


I'm trying to write data from a Flink SQL job to a DynamoDB table, but I'm consistently getting the following error:

Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The provided key element does not match the schema (Service: DynamoDb, Status Code: 400, Request ID: ...)

The error occurs even though my data appears correct, and I'm sure the data types align. I've logged the output of my Flink job before the sink, and the restaurant_id looks valid.

DynamoDB Table Schema

The DynamoDB table has a simple primary key. The partition key is restaurant_id, and there is no sort key on the main table.

dynamodb:
  OnlineFeatures:
    attributes:
        - name: "restaurant_id"
          type: "S"
        ...
    hash_key: "restaurant_id"
    # No sort key defined for the main table
    global_secondary_indexes:
      - name: "country_index"
        hash_key: "_country"
        range_key: "_training_timestamp"
        projection_type: "ALL"

The GSI uses a composite key, but my Flink job is writing to the main table, not the index.

Flink SQL Code

Here is the Flink SQL CREATE TABLE and INSERT INTO statement I'm using. The restaurant_id is correctly cast to STRING.

-- Flink Table Definition for the DynamoDB sink
CREATE TABLE dynamodb_sink_table (
    _window_end STRING,
    ...
    restaurant_id STRING,
    ...
) PARTITIONED BY ( restaurant_id )
WITH (
    'connector' = 'dynamodb',
    'table-name' = '{dynamodb_table_name}',
    'aws.region' = '{config.aws_region}',
    'sink.ignore-nulls' = 'true'
);

-- The Flink query that populates the sink table
INSERT INTO dynamodb_sink_table
SELECT 
    ...,
    CAST(dfr.delco_restaurant_id AS STRING) as restaurant_id,
    ...
FROM df_rests dfr 
LEFT JOIN ...
WHERE dfr.restaurant_id IS NOT NULL;

The Problem

The Flink job processes data correctly, and the print sink shows valid restaurant_id values. The DynamoDB error suggests a mismatch in the key schema, not the data itself.

What configuration am I missing or what is Flink doing that would cause it to send a key structure that doesn't match the simple partition key of my DynamoDB table? How can I force Flink to only use restaurant_id as the key for its write requests?

Thank you.


Solution

  • You're inserting

    CAST(dfr.delco_restaurant_id...

    But you check on dfr.restaurant_id

    Make sure you're checking on the same attribute you're inserting, also as well as not null, you should check it's not an empty string.