springjdbcspring-integrationpoller

Spring integration poller triggering with the wrong time interval


Code: https://github.com/giuliopulina/spring-integration-poller

I have a problem trying to create a jdbc poller with Spring integration.

When i feed the table with new data, the processing is slower than expected: everything works fine, apart the fact that the poll is triggered every 60 seconds, and I can't understand why.

2015-05-27 10:50:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Resolved expression #root.![pk] to (list of pks)

2015-05-27 10:51:40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Resolved expression #root.![pk] to (list of pks)

This is the relevant part of the spring integration configuration xml:

<task:executor id="pollerPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>

<!--<task:executor id="processingPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/> -->

<bean id="jdbcSource" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
    <constructor-arg ref="dataSource"/>
    <constructor-arg value="XXXXXXXXXXXXXX"/>
    <property name="updateSql" value="XXXXXXXXXXXXXXXX"/>
    <property name="maxRowsPerPoll" value="50"/>
</bean>

<int:inbound-channel-adapter send-timeout="10000" auto-startup="false" id="inboundAdapter" ref="jdbcSource" channel="jdbcOutputChannel">
    <int:poller receive-timeout="3000" time-unit="MILLISECONDS" fixed-rate="0" error-channel="errorChannel" task-executor="pollerPool">
        <int:advice-chain>
            <ref bean="threadPrepareInterceptor"/>
            <ref bean="txAdvice"/>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<int:service-activator id="serviceActivator" input-channel="jdbcOutputChannel" ref="someServiceActivatorBean"/>

<tx:advice id="txAdvice" transaction-manager="txManager">
    <tx:attributes>
        <tx:method name="get*" read-only="true"/>
        <tx:method name="*"/>
    </tx:attributes>
</tx:advice>

<int:channel id="jdbcOutputChannel"  >
    <!-- using direct channel -->
    <!--<int:dispatcher task-executor="processingPool"/>-->
</int:channel>

Could you please help me understand the issue?

UPDATE:

About "jdbcOutputChannel" suggestion on transactions, I agree and I modified my configuration according to your hint because it's cleaner (anyway, also service activator was running in a separate transaction, even if wasn't outlined in the xml sample).

About the issue I have, I tried to remove all other spring integration components and the poller is continuosly triggered as I expected (I know the fixed-rate=0 is way too high :) ) Instead, when having other pollers in the project configured like this, also my poller seems to inherit the same timeout:

<int:service-activator id="someOtherServiceActivator">
    <int:poller fixed-rate="0" error-channel="someOtherPollerErrorChannel" receive-timeout="60000" />
</int:service-activator>

Switching the timeout of the other pollers to 10000ms, also my poller is triggered every 10 seconds (instead of 60). I cannot share the complete spring integration configuration, but I would like to ask: it can be possible that completely separated poller can modify each other behaviour?

UPDATE 2: I created a separated project trying to reproduce the issue, but still I couldn't manage to do it. So, I tried to remove the following configuration, that it was been introduced in order to start the pollers only when the application is fully up and running:

<int:publish-subscribe-channel id="startupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="startupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="startupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="startupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" /> 

And the problem has disappeared, even if this I can fully understand the reason. Anyway, creating a different startupChannel for my poller works perfectly:

<int:publish-subscribe-channel id="globalStartupChannel" />
<int:publish-subscribe-channel id="myStartupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="globalStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int-event:inbound-channel-adapter channel="myStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="globalStartupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="myStartupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />

UPDATE 3:

While preparing the project with the code for you I noticed the following log:

Info: No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.

So, I added the following configuration and now everything is working fine:

<task:scheduler id="taskScheduler" pool-size="20" />

I guess that the default pool-size is 10, so in some way the configuration is overwritten when having totalNumberOfPollers > taskScheduler.size(). Am I right?

Thanks Giulio


Solution

  • I can't reproduce your situation; I suggest you take a thread dump between polls to see what the thread is doing.

    That said, a fixed-rate of 0 is incredibly aggressive; your DBAs will probably throw a fit polling with no delay like that.

    Also, jdbcOutputChannel, being an ExecutorChannel, means the transaction will commit immediately after the message is sent to that channel. If you want the flow to run in the transactions, you should not use a dispatcher here.

    EDIT:

    I still can't reproduce your situation with this...

    <int:control-bus input-channel="input"/>
    
    <int-event:inbound-channel-adapter channel="ps" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
    
    <int:publish-subscribe-channel id="ps" />
    
    <int:transformer input-channel="ps" output-channel="input" expression="'@foo.start()'" />
    
    <int:transformer input-channel="ps" output-channel="input" expression="'@sa.start()'" />
    
    <int:inbound-channel-adapter id="foo" channel="bar" expression="'foo'" auto-startup="false">
        <int:poller fixed-rate="1000" />
    </int:inbound-channel-adapter>
    
    <int:channel id="bar">
        <int:queue />
    </int:channel>
    
    <int:service-activator id="sa" input-channel="bar" output-channel="baz" auto-startup="false"
            expression="payload.toUpperCase()">
        <int:poller fixed-rate="6000" receive-timeout="0" />
    </int:service-activator>
    
    <int:logging-channel-adapter id="baz" level="ERROR"/>
    

    ... as expected I see 6 FOOs every 6 seconds (the i-c-a is polled once a second and the sa runs once every 6 seconds).

    EDIT2:

    I looked at your project and the root cause of your issue is, as you say, many polled endpoints, but really, it's this:

    fixed-rate="0" receive-timeout="60000"
    

    With this configuration, the scheduler resources (threads) are blocked in the QueueChannels and, as you have found, you have exhausted all the resources.

    One solution is to increase the number of threads in the scheduler pool.

    With this configuration, it seems you are trying to get on-demand, zero latency messaging with a poller by having the poller constantly waiting in the queue receive() method.

    If you can't afford any latency, consider using DirectChannels instead. If you don't want the downstream endpoint to run on the caller's thread, use ExecutorChannels...

    <task:executor id="exec" pool-size="100"/>
    
    <int:channel id="otherMessageChannel1">
        <int:dispatcher task-executor="exec" />
    </int:channel>
    

    This is generally preferred over your current setup.