queuespring-integrationpoller

File poller activated either when a queue is full or a set amount of time passes


We've been requested to index report files from a folder. We need to index in batches, meaning we won't index if we don't have enough files. However, we will eventually index even if we don't have enough files after a certain amount of time passes. So we have this two conditions that trigger the same function.

We've been able to properly configure a queue, the inbound poller (for getting files) and the outbound poller (for indexing files after x time passes). However we've still been unable to trigger the function when the queue is full. And, even though I'll recognize we're far from being experts on Spring Integration, believe me when I say we've been through the documentation multiple times and seen endless example projects. But this still escapes our reach.

Enough words. Here, have some code:

ip-customer-reports-config:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                  http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">

    <bean id="customerReportsFileManager" class="xxx.xxx.xxx.xxx.reports.CustomerReportsFileManager" />

    <int:channel id="customerReportInputChannel" />

    <int-file:inbound-channel-adapter channel="customerReportInputChannel" directory="${customer.reports.directory}">
        <int:poller time-unit="SECONDS" fixed-delay="3" />
    </int-file:inbound-channel-adapter>

    <int:service-activator input-channel="customerReportInputChannel" output-channel="customerReportIndexationInputChannel"
                           ref="customerReportsFileManager" method="prepareFile" />

    <int:channel id="customerReportIndexationInputChannel">
        <int:queue capacity="3" /> <!-- Capacity -1 -->
    </int:channel>

    <int:outbound-channel-adapter channel="customerReportIndexationInputChannel" ref="customerReportsFileManager" method="indexReports">
        <int:poller time-unit="SECONDS" fixed-delay="60" />
    </int:outbound-channel-adapter>

</beans>

CustomerReportsFileManager:

public class CustomerReportsFileManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(CustomerReportsFileManager.class);

    public File prepareFile(File file) {
        LOGGER.warn("[prepareFile] " + file.getPath());

        return file;
    }

    public void indexReports(File file) {
        LOGGER.warn("[indexReports] " + file.getPath());
    }
}

Solution

  • You are using the wrong approach.

    Don't use a queue for this - simply write a custom FileListFilter and inject it into the file adapter - the filter can return an empty list until the required time passes or the required number of files is reached.