I am trying to read data from an event outbox table and then have two sinks , one to push the event to a Kafka topic and other to update the table in same database using Flink. Since I want these two sinks to have exactly once delivery semantics , I am using JDBC Sink for that. Below is code for this. Flink Version : 1.16.2
Map<String, String> map = processor.getStreamConfigMap();
EnvironmentSettings settings = processor.getEnvironmentSettings(map);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
tEnv.registerCatalog(ApplicationConstants.CATALOG_NAME, processor.createCatalog());
tEnv.useCatalog(ApplicationConstants.CATALOG_NAME);
DataStream<Row> resultStream = processor.fetchData(tEnv);
resultStream.keyBy(row -> row.getField("id")).addSink(
JdbcSink.exactlyOnceSink(
"update event_log set event_status = 'SUCCESS' where id = ?",
((preparedStatement, row) -> preparedStatement.setString(1, (String) row.getField("id"))),
JdbcExecutionOptions.builder()
.withMaxRetries(0)
.build(),
JdbcExactlyOnceOptions.builder()
.withTransactionPerConnection(true)
.build(),
() -> {
// create a driver-specific XA DataSource
PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
xaDataSource.setUrl(processor.applicationConfig.getDbConfig().getJdbcUrl());
xaDataSource.setUser(processor.dbUsername);
xaDataSource.setPassword(processor.dbPassword);
xaDataSource.setCurrentSchema(processor.applicationConfig.getDbConfig().getSchema());
return xaDataSource;
}));
I am getting the below error near the addSink method.
The implementation of the AbstractRichFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)
at org.apache.flink.streaming.api.datastream.KeyedStream.addSink(KeyedStream.java:302)
at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.lambda$main$0(EventOutboxProcessor.java:119)
at io.micrometer.core.instrument.AbstractTimer.record(AbstractTimer.java:223)
at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.main(EventOutboxProcessor.java:97)
Lambda didn't work. I fixed it by creating a class which implemented SerializableSupplier.
public class PGXADataSourceProvider implements SerializableSupplier<XADataSource> {
private final String jdbcUrl;
private final String username;
private final String password;
public PGXADataSourceProvider(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}
@Override
public javax.sql.XADataSource get() {
PGXADataSource xaDataSource = new PGXADataSource();
xaDataSource.setUrl(jdbcUrl);
xaDataSource.setUser(username);
xaDataSource.setPassword(password);
xaDataSource.setCurrentSchema("public");
return xaDataSource;
}
}
and used it in sink like below
JdbcSink.exactlyOnceSink(
"update " + applicationConfig.getDbConfig().getSchema() +"."+ "event_log " + "set event_status = 'SUCCESS' where id = ?",
((preparedStatement, row) -> preparedStatement.setString(1, (String) row.f0.getField("id"))),
JdbcExecutionOptions.builder()
.withMaxRetries(0)
.build(),
JdbcExactlyOnceOptions.builder().withAllowOutOfOrderCommits(false)
.withTransactionPerConnection(true)
.build(),
new PGXADataSourceProvider(
applicationConfig.getDbConfig().getJdbcUrl()+"/"+applicationConfig.getDbConfig().getDatabase(),
applicationConfig.getDbConfig().getUsername(),
applicationConfig.getDbConfig().getPassword()
))