amazon-web-servicesamazon-kinesisaws-dms

How to put changes from AWS DMS into specific shard in AWS KINESIS


I have a database, which has 40 tables and I am using AWS DMS to fetch the data changes from these tables and push them into AWS KINESIS stream which has 2 shards in it and eventually, my two consumer applications will read the data from the Kinesis and update the respective databases with the help of custom logic.

The problem here is that I couldn't find how to make the records routed to specific shard in the kinesis. Like, I need the first 20 tables placed in shard 1 and the next 20 should be placed in shard 2. But when I checked the documentation, It seems that the routing to shard will be based on the hash of the partition_key.

Can anyone shed some light on how I can achieve this?

I have tried different approaches listed online and official documentation, but I couldn't achieve what I expected. I want to understand how it can be achieved or at least want to know whether this can be achieved or not.


Solution

  • The way to send records to a single shard in a multi-shard stream is to use the same partition key.

    As described here, Kinesis takes the MD5 hash of the partition key value to assign a shard. Each shard has a range of partition keys, which you can get with the DescribeStreams API. For example, here are the partition ranges for a 2-shard stream:

    [
      {
        "ShardId": "shardId-000000000000",
        "HashKeyRange": {
          "StartingHashKey": "0",
          "EndingHashKey": "170141183460469231731687303715884105727"
        }
      },
      {
        "ShardId": "shardId-000000000001",
        "HashKeyRange": {
          "StartingHashKey": "170141183460469231731687303715884105728",
          "EndingHashKey": "340282366920938463463374607431768211455"
        }
      }
    ]
    

    These hashkeys, while represented as strings, are actually numbers in the range 0 .. 2**128-1 (where ** is the Python exponentiation operator). As you can see, with a two-shard stream the range is divided equally between the two shards, with the first shard containing hashes up to (hex) 7fffffffffffffffffffffffffffffff, and the second shard containing hashes above this value.

    So, to put records into a specific shard, you just need to find a partition key value that belongs to that range. For example:

    echo -n "1" | md5sum
    c4ca4238a0b923820dcc509a6f75849b  -
    
    echo -n "2" | md5sum
    c81e728d9d4c2f636f067f89cc14862c  -
    
    echo -n "3" | md5sum
    eccbc87e4b5ce2fe28308fd9f2a7baf3  -
    
    echo -n "4" | md5sum
    a87ff679a2f3e71d9181a67b7542122c  -
    
    echo -n "5" | md5sum
    e4da3b7fbbce2345d7772b0674a318d5  -
    
    echo -n "6" | md5sum
    1679091c5a880faf6fb5e6087eb1b2dc  -
    

    So from this, you can see that values "1" to "5" will go into the second shard, while "6" goes into the first. If you want to ensure that all of the rows for some set of tables go into a particular shard, pick the appropriate value.

    Why this might not give you the results you want

    This technique ensures that all of the related data goes to the same shard, even after a reshard.

    However, it does not guarantee order. I go into that more in this answer, but the short version is that the order that Kinesis accepts records does not have to match the order that those records were generated.

    The most likely reason to apply to your case is that Kinesis may reject individual records from a PutRecords request, typically because the shard has reached its write limit. If you're feeding change-data-capture events into the stream, then a burst of activity against the underlying tables could cause this to happen.