javapostgresqlspring-bootspring-cloud-streamamazon-kinesis

Checkpointing and Locking in Amazon Kinesis using PostgreSQL


I have a spring boot application that used Amazon Kinesis to consume data and save it to PostgreSQL. Since I'm already using one database(PostgreSQL) in my application, i want to avoid using another database(Dynamo db) just only for checkpointing and locking purpose. So that the resource cost can be reduced.

Using below dependency in my project implementation

'org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2'

my application.yml file

spring:
  cloud:
    aws:
      credentials:
        sts:
          web-identity-token-file: <Where i had given the token file path>
          role-arn: <Where i had given the assume role arn>
          role-session-name: RoleSessionName
      region:
        static: <where i had given my aws region>
      dualstack-enabled: false
    stream:
      kinesis:
        binder:
          auto-create-stream: false
          min-shard-count: 1
      bindings:
        input-in-0:
          destination: test-test.tst.v1
          content-type: text/json

Below is the java class which contain the bean for processing data from Kinesis

@Configuration
public class KinesisConsumerBinder{
   @Bean
   public Consumer<Message<String>> input(){
      return message ->{
        System.out.println("Data from Kinesis:"+message.getPayload());
        //Process the message got from Kinesis
      }
   }

}

As per my previous question i asked in below link Can we use PostgreSQL instead of default dynamo db for checkpointing and locking in case of consuming data from Kinesis Using Binder approach

i had done same solution provided and its work for me. I was able to use PostgreSQL for checkpointing and locking purpose. But I'm facing below issue

In the processing logic of consumed message from kinesis, i written a logic to save into database(postgresql). There are two pods running in my environment connecting to same stream and same database in load balancing manner. I'm seeing two data is getting inserted into my table. Below is the sample dummy masked metadata data that got saved in checkpointing tables

Table : int_metadata_store

metadata_key : metadata_value : region

anonymous.6168ad-9a13-5b75-96b9-996f340dfd:test-test.tst.v1:shardId-000000001 : 8954146227109765333558818710934653698471926607 : DEFAULT

anonymous.7168ad-8b13-5c75-96n9-996f340dfd:test-test.tst.v1:shardId-000000001 : 8954146227109765333558818710934653698471926607 : DEFAULT

Table : int_lock

lock_key : region : client_id : created_date

44444444-7df8-2222-db33-c0bbb07bbbb8 : DEFAULT : a9a9a9a9-8888-4444-777d-99a33aa20122 : 2024-03-26 09:28:29.662

55555555-dddd-3333-b000-8810ae858a84 : DEFAULT : a9a9a9a9-8888-4444-777d-99a33aa20122 : 2024-03-26 09:28:29.662

66666666-eeee-4444-a805-6138a0c59976 : DEFAULT : a9a9a9a9-8888-4444-777d-99a33aa20122 : 2024-03-26 09:28:29.662

Could anyone please help me to solve this issue.

Expecting: Only one message should get saved in db for each message from Kinesis


Solution

  • The metadata store is for checkpoint tracking in the shard. In the perfect world we don't need it at all. It is just there for system restart to be able to not consume those records from the shard which have been processed before.

    The distributed lock logic is there to have an exclusive access to the shard independently of the number of competing consumers for that shard.

    Both of these tables has nothing to do with data you are consuming from Kinesis stream. Your concern about each message and these tables is not clear.