I'm trying to write data from a Flink SQL job to a DynamoDB table, but I'm consistently getting the following error:
Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The provided key element does not match the schema (Service: DynamoDb, Status Code: 400, Request ID: ...)
The error occurs even though my data appears correct, and I'm sure the data types align. I've logged the output of my Flink job before the sink, and the restaurant_id
looks valid.
The DynamoDB table has a simple primary key. The partition key is restaurant_id
, and there is no sort key on the main table.
dynamodb:
OnlineFeatures:
attributes:
- name: "restaurant_id"
type: "S"
...
hash_key: "restaurant_id"
# No sort key defined for the main table
global_secondary_indexes:
- name: "country_index"
hash_key: "_country"
range_key: "_training_timestamp"
projection_type: "ALL"
The GSI uses a composite key, but my Flink job is writing to the main table, not the index.
Here is the Flink SQL CREATE TABLE
and INSERT INTO
statement I'm using. The restaurant_id
is correctly cast to STRING
.
-- Flink Table Definition for the DynamoDB sink
CREATE TABLE dynamodb_sink_table (
_window_end STRING,
...
restaurant_id STRING,
...
) PARTITIONED BY ( restaurant_id )
WITH (
'connector' = 'dynamodb',
'table-name' = '{dynamodb_table_name}',
'aws.region' = '{config.aws_region}',
'sink.ignore-nulls' = 'true'
);
-- The Flink query that populates the sink table
INSERT INTO dynamodb_sink_table
SELECT
...,
CAST(dfr.delco_restaurant_id AS STRING) as restaurant_id,
...
FROM df_rests dfr
LEFT JOIN ...
WHERE dfr.restaurant_id IS NOT NULL;
The Flink job processes data correctly, and the print
sink shows valid restaurant_id
values. The DynamoDB error suggests a mismatch in the key schema, not the data itself.
What configuration am I missing or what is Flink doing that would cause it to send a key structure that doesn't match the simple partition key of my DynamoDB table? How can I force Flink to only use restaurant_id
as the key for its write requests?
Thank you.
You're inserting
CAST(dfr.delco_restaurant_id
...
But you check on dfr.restaurant_id
Make sure you're checking on the same attribute you're inserting, also as well as not null, you should check it's not an empty string.