javapostgresqlspring-bootspring-cloud-streamamazon-kinesis

Can we use PostgreSQL instead of default dynamo db for checkpointing and locking in case of consuming data from Kinesis Using Binder approach


I have a spring boot application that used Amazon Kinesis to consume data and save it to PostgreSQL. Since I'm already using one database(PostgreSQL) in my application, i want to avoid using another database(Dynamo db) just only for checkpointing and locking purpose. So that the resource cost can be reduced.

Expecting : I want to change the default dynamo db to PostgreSQL db for checkpointing and locking.

Using below dependency in my project implementation 'org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2'

my application.yml file

spring:
  cloud:
    aws:
      credentials:
        sts:
          web-identity-token-file: <Where i had given the token file path>
          role-arn: <Where i had given the assume role arn>
          role-session-name: RoleSessionName
      region:
        static: <where i had given my aws region>
      dualstack-enabled: false
    stream:
      kinesis:
        binder:
          auto-create-stream: false
          min-shard-count: 1
      bindings:
        input-in-0:
          destination: test-test.tst.v1
          content-type: text/json

Below is the java class which contain the bean for processing data from Kinesis

@Configuration
public class KinesisConsumerBinder{
   @Bean
   public Consumer<Message<String>> input(){
      return message ->{
        System.out.println("Data from Kinesis:"+message.getPayload());
        //Process the message got from Kinesis
      }
   }

}

I had tried in many ways, but not able to find right solution

I had tried below steps also

  1. Created a table and its entity class for table:"KinesisCheckpoint" which contain the fields shard Id, sequence number and stream name

  2. Created a Spring Data JPA repository interface for handling CRUD operations of checkpoint data.

  3. Updated the consumer code as below, for saving the checkpointing information(like shard Id, sequence number and stream name) to above created table:"KinesisCheckpoint" after successful processing a batch of records.

@Configuration
public class KinesisConsumerBinder{
   @Bean
   public Consumer<Message<String>> input(){
      return message ->{
        System.out.println("Data from Kinesis:"+message.getPayload());
        String strmNme = (String) message.getHeader().get("aws_receivedStream");
        String shrdId = (String) message.getHeader().get("aws_shard");
        String seqNo = (String) message.getHeader().get("aws_shard");
        String lstChckPntDta = checkPointRepo.findLastChckpoint(strmNme,shrdId);// query that written in repo for getting the last chek point info for the given stream name and shard id

        //Process the message got from Kinesis

        if(null == lstChckPntDta){
           // save the new checkpoint info(shard Id, sequence number and stream name ) to "KinesisCheckpoint" table
        }else{
           // update the checkpoint data(sequence no) for the filter Stream name and shardId
        }
      }
   }

}

As the last step, when the application starts/reboot, i had retrieved the saved checkpoint information from table "KinesisCheckpoint" and i want to use it to resume processing from where it left off.

How can we use the information that i collected in table "KinesisCheckpoint" to use for resuming processing from where it left off.

The above approach is not the better approach .Is there any other better approach to achieve the checkpointing and locking using PostgreSQL database.

Could anyone please help me to resolve this


Solution

  • The locking logic in the KinesisMessageDrivenChannelAdapter is based on the LockRegistry. The checkpointing one is based on the ConcurrentMetadataStore. Yes, by default they are DynamoDbLockRegistry & DynamoDbMetadataStore, respectively. We don't tell that in docs, but auto-configuration beans for those LockRegistry & ConcurrentMetadataStore can be overridden from end-user configuration. And that way you can use a JdbcLockRegistry & JdbcMetadataStore based on the DataSource for your PostgreSQL.

    You would need a schema-postgresql.sql from spring-integration-jdbc jar to initialize DB properly. This way you would not need any custom logic you are talking about right now.

    See more info in docs:

    https://docs.spring.io/spring-integration/reference/jdbc/lock-registry.html

    https://docs.spring.io/spring-integration/reference/jdbc/metadata-store.html