springspring-bootspring-integrationspring-integration-dsl

Spring Integration Concurrent execution of JDBC Inbound Channel Adapter


I have a spring integration microservice which has an integration flow starting from an Jdbc Inbound Channel adapter.

When using JDBC inbound channel adapter in an integration flow, we can set an update query to update the read rows to mark them as processed to avoid reprocessing them. However if we run a replica of the microservice, there is a chance that the adapters in both query the DB at the same time, which will result in the same resultset being passed to both causing duplicate processing.

I have prevented this by writing a custom data source which uses transactions along with a “select for update” query which allows us to synchronize the concurrent DB reads.

Is there anyway to achieve this using the Jdbc Inbound channel adapter without a custom implementation like the above?


Solution

  • When you run multiple instances of the microservice (e.g., in Kubernetes or for scaling), both instances can read the same rows from the database before they are marked as processed. This leads to duplicate processing.

    You can avoid this by using standard Spring Integration features, without a custom DataSource.

    Use SELECT ... FOR UPDATE SKIP LOCKED in your selectQuery:

    SELECT * FROM my_table WHERE processed = false FOR UPDATE SKIP LOCKED
    

    This allows one instance to lock and process rows, and others to skip the locked ones.

    Enable transactional polling:

    Pollers.fixedDelay(5000)
           .transactional(new DataSourceTransactionManager(dataSource))
    

    You can build this into an integration flow - the adapter reads and updates rows in the same transaction, and Spring ensures it all happens atomically, for example:

    @Bean
    public IntegrationFlow jdbcPollingFlow(DataSource dataSource) {
        return IntegrationFlows.from(
                Jdbc.inboundAdapter(
                    dataSource, 
                "SELECT * FROM my_table WHERE processed = false FOR UPDATE SKIP LOCKED")
                    .update("UPDATE my_table SET processed = true WHERE id = :id")
                    .rowMapper(customRowMapper())
                    .expectSingleResult(false)
                    .maxRowsPerPoll(10),
                c -> c.poller(Pollers.fixedDelay(5000).transactional(new DataSourceTransactionManager(dataSource)))
        )
        .handle(messageProcessor())
        .get();
    }
    

    In this case, you don't need a custom DataSource, duplicate processing is avoided, and everything works using the standard JDBC adapter and transaction support.