apache-flinkflink-streamingflink-sql

The implementation of the AbstractRichFunction is not serializable when using JDBC Sink in Flink


I am trying to read data from an event outbox table and then have two sinks , one to push the event to a Kafka topic and other to update the table in same database using Flink. Since I want these two sinks to have exactly once delivery semantics , I am using JDBC Sink for that. Below is code for this. Flink Version : 1.16.2

Map<String, String> map = processor.getStreamConfigMap();
                    EnvironmentSettings settings = processor.getEnvironmentSettings(map);

                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

                    tEnv.registerCatalog(ApplicationConstants.CATALOG_NAME, processor.createCatalog());
                    tEnv.useCatalog(ApplicationConstants.CATALOG_NAME);
                    DataStream<Row> resultStream = processor.fetchData(tEnv);
                    resultStream.keyBy(row -> row.getField("id")).addSink(
                    JdbcSink.exactlyOnceSink(
                            "update event_log set event_status = 'SUCCESS' where id = ?",
                            ((preparedStatement, row) -> preparedStatement.setString(1, (String) row.getField("id"))),
                            JdbcExecutionOptions.builder()
                                    .withMaxRetries(0)
                                    .build(),
                            JdbcExactlyOnceOptions.builder()
                                    .withTransactionPerConnection(true)
                                    .build(),
                            () -> {
                                // create a driver-specific XA DataSource
                                PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
                                xaDataSource.setUrl(processor.applicationConfig.getDbConfig().getJdbcUrl());
                                xaDataSource.setUser(processor.dbUsername);
                                xaDataSource.setPassword(processor.dbPassword);
                                xaDataSource.setCurrentSchema(processor.applicationConfig.getDbConfig().getSchema());
                                return xaDataSource;
                            }));

I am getting the below error near the addSink method.

The implementation of the AbstractRichFunction is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)
    at org.apache.flink.streaming.api.datastream.KeyedStream.addSink(KeyedStream.java:302)
    at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.lambda$main$0(EventOutboxProcessor.java:119)
    at io.micrometer.core.instrument.AbstractTimer.record(AbstractTimer.java:223)
    at com.xxx.enterprise.xxx.dataprocessor.processor.EventOutboxProcessor.main(EventOutboxProcessor.java:97)

Solution

  • Lambda didn't work. I fixed it by creating a class which implemented SerializableSupplier.

    public class PGXADataSourceProvider implements SerializableSupplier<XADataSource> {
        private final String jdbcUrl;
        private final String username;
        private final String password;
    
        public PGXADataSourceProvider(String jdbcUrl, String username, String password) {
            this.jdbcUrl = jdbcUrl;
            this.username = username;
            this.password = password;
        }
    
        @Override
        public javax.sql.XADataSource get() {
            PGXADataSource xaDataSource = new PGXADataSource();
            xaDataSource.setUrl(jdbcUrl);
            xaDataSource.setUser(username);
            xaDataSource.setPassword(password);
            xaDataSource.setCurrentSchema("public");
            return xaDataSource;
        }
    }
    

    and used it in sink like below

    
     JdbcSink.exactlyOnceSink(
                                        "update " + applicationConfig.getDbConfig().getSchema() +"."+ "event_log " +  "set event_status = 'SUCCESS' where id = ?",
                                        ((preparedStatement, row) -> preparedStatement.setString(1, (String) row.f0.getField("id"))),
                                        JdbcExecutionOptions.builder()
                                                .withMaxRetries(0)
                                                .build(),
                                        JdbcExactlyOnceOptions.builder().withAllowOutOfOrderCommits(false)
                                                .withTransactionPerConnection(true)
                                                .build(),
                                        new PGXADataSourceProvider(
                                                applicationConfig.getDbConfig().getJdbcUrl()+"/"+applicationConfig.getDbConfig().getDatabase(),
                                                applicationConfig.getDbConfig().getUsername(),
                                                applicationConfig.getDbConfig().getPassword()
                                        ))