I have a spring boot application that used Amazon Kinesis to consume data and save it to PostgreSQL. Since I'm already using one database(PostgreSQL) in my application, i want to avoid using another database(Dynamo db) just only for checkpointing and locking purpose. So that the resource cost can be reduced.
Expecting : I want to change the default dynamo db to PostgreSQL db for checkpointing and locking.
Using below dependency in my project implementation 'org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2'
my application.yml file
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file: <Where i had given the token file path>
role-arn: <Where i had given the assume role arn>
role-session-name: RoleSessionName
region:
static: <where i had given my aws region>
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
Below is the java class which contain the bean for processing data from Kinesis
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer<Message<String>> input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
I had tried in many ways, but not able to find right solution
I had tried below steps also
Created a table and its entity class for table:"KinesisCheckpoint" which contain the fields shard Id, sequence number and stream name
Created a Spring Data JPA repository interface for handling CRUD operations of checkpoint data.
Updated the consumer code as below, for saving the checkpointing information(like shard Id, sequence number and stream name) to above created table:"KinesisCheckpoint" after successful processing a batch of records.
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer<Message<String>> input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
String strmNme = (String) message.getHeader().get("aws_receivedStream");
String shrdId = (String) message.getHeader().get("aws_shard");
String seqNo = (String) message.getHeader().get("aws_shard");
String lstChckPntDta = checkPointRepo.findLastChckpoint(strmNme,shrdId);// query that written in repo for getting the last chek point info for the given stream name and shard id
//Process the message got from Kinesis
if(null == lstChckPntDta){
// save the new checkpoint info(shard Id, sequence number and stream name ) to "KinesisCheckpoint" table
}else{
// update the checkpoint data(sequence no) for the filter Stream name and shardId
}
}
}
}
As the last step, when the application starts/reboot, i had retrieved the saved checkpoint information from table "KinesisCheckpoint" and i want to use it to resume processing from where it left off.
How can we use the information that i collected in table "KinesisCheckpoint" to use for resuming processing from where it left off.
The above approach is not the better approach .Is there any other better approach to achieve the checkpointing and locking using PostgreSQL database.
Could anyone please help me to resolve this
The locking logic in the KinesisMessageDrivenChannelAdapter
is based on the LockRegistry
. The checkpointing one is based on the ConcurrentMetadataStore
.
Yes, by default they are DynamoDbLockRegistry
& DynamoDbMetadataStore
, respectively. We don't tell that in docs, but auto-configuration beans for those LockRegistry
& ConcurrentMetadataStore
can be overridden from end-user configuration. And that way you can use a JdbcLockRegistry
& JdbcMetadataStore
based on the DataSource
for your PostgreSQL.
You would need a schema-postgresql.sql
from spring-integration-jdbc
jar to initialize DB properly. This way you would not need any custom logic you are talking about right now.
See more info in docs:
https://docs.spring.io/spring-integration/reference/jdbc/lock-registry.html
https://docs.spring.io/spring-integration/reference/jdbc/metadata-store.html