I am new to Flink and have a use case to consume data from Topic1 and insert/update in DB and also push the same data to Topic2 that will be used by different services. The code I have now is something like below:
DataStream<AxonMessage> stream =
env.fromSource(axon.source(Constants.CONSUMER_TOPIC_NAME, Constants.CONSUMER_GROUP_ID),
WatermarkStrategy.noWatermarks(), "foo-kafka-source")
.map(axonMessage -> (FooModel) axonMessage.getPayload());
stream.addSink(jdbc.exactlyOnceSink(new FooJdbcSink()))
.name("data-db-sink")
.uid("data-db-sink");
stream.sinkTo(kafka.exactlyOnceSink(fooSchema))
.name("data-kafka-sink")
.uid("data-kafka-sink");
The requirement is to complete the both sink operation as one transaction, i.e if the data is not inserted in the DB it should not be pushed to Topic2 as well (in case of any error). Will the above sample code be enough (I referred this article as well) or if not how to manage transactions in Flink.
Any help will be appreciated.
Thanks!!!
The short answer is Yes. It is enough to set the semantics of all sinks to exactly once, Flink infrastructure will handle the rest. Flink achieves exactly once semantics by committing the transaction of Kafka or the database only on the second phase of the checkpoint, after the flush of the data is completed. Therefore, if something failed for some reason, it will not commit the transaction and start over from the previous checkpoint that was completed successfully. That way, it guarantees that you won't produce the same message twice.
But, it depends on your database, as some of the database connectors for Flink do not support Exactly Once semantics.
In your example, your database is JDBC, according to Flink's documentation it has some support for exactly once, for example:
stream.addSink(JdbcSink.exactlyOnceSink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
JdbcExecutionOptions.builder()
.withMaxRetries(0)
.build(),
JdbcExactlyOnceOptions.defaults(),
() -> {
// create a driver-specific XA DataSource
// The following example is for derby
EmbeddedXADataSource ds = new EmbeddedXADataSource();
ds.setDatabaseName("my_db");
return ds;
});
You can read more about that here.
For Kafka, you need to configure your KafkaSink correctly, and set the DeliveryGuarantee.EXACTLY_ONCE
as it's guarantee, here's an example:
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
You can read more about that here.