I have a spring boot application that used Amazon Kinesis to consume data and save it to PostgreSQL. Since I'm already using one database(PostgreSQL) in my application, i want to avoid using another database(Dynamo db) just only for checkpointing and locking purpose. So that the resource cost can be reduced.
Using below dependency in my project implementation
'org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2'
my application.yml file
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file: <Where i had given the token file path>
role-arn: <Where i had given the assume role arn>
role-session-name: RoleSessionName
region:
static: <where i had given my aws region>
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
Below is the java class which contain the bean for processing data from Kinesis
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer<Message<String>> input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
As per my previous question i asked in below link Can we use PostgreSQL instead of default dynamo db for checkpointing and locking in case of consuming data from Kinesis Using Binder approach
i had done same solution provided and its work for me. I was able to use PostgreSQL for checkpointing and locking purpose. But I'm facing below issue
In the processing logic of consumed message from kinesis, i written a logic to save into database(postgresql). There are two pods running in my environment connecting to same stream and same database in load balancing manner. I'm seeing two data is getting inserted into my table. Below is the sample dummy masked metadata data that got saved in checkpointing tables
anonymous.6168ad-9a13-5b75-96b9-996f340dfd:test-test.tst.v1:shardId-000000001 : 8954146227109765333558818710934653698471926607 : DEFAULT
anonymous.7168ad-8b13-5c75-96n9-996f340dfd:test-test.tst.v1:shardId-000000001 : 8954146227109765333558818710934653698471926607 : DEFAULT
44444444-7df8-2222-db33-c0bbb07bbbb8 : DEFAULT : a9a9a9a9-8888-4444-777d-99a33aa20122 : 2024-03-26 09:28:29.662
55555555-dddd-3333-b000-8810ae858a84 : DEFAULT : a9a9a9a9-8888-4444-777d-99a33aa20122 : 2024-03-26 09:28:29.662
66666666-eeee-4444-a805-6138a0c59976 : DEFAULT : a9a9a9a9-8888-4444-777d-99a33aa20122 : 2024-03-26 09:28:29.662
Could anyone please help me to solve this issue.
Expecting: Only one message should get saved in db for each message from Kinesis
The metadata store is for checkpoint tracking in the shard. In the perfect world we don't need it at all. It is just there for system restart to be able to not consume those records from the shard which have been processed before.
The distributed lock logic is there to have an exclusive access to the shard independently of the number of competing consumers for that shard.
Both of these tables has nothing to do with data you are consuming from Kinesis stream. Your concern about each message and these tables is not clear.