spring-integrationmessagestoreaggregator

Spring Integration Aggregator is sending expired/timed out messages to the wrong channel in a specific scenario


I am sending messages to the input channel of the aggregator and then aggregator releases the aggregated messages to the output channel. Aggregator is expecting 2 messages at least (for aggregation) otherwise wait for 10 seconds for timeout. I am also using the jdbc message store.

Following are the scenarios i have tested.

Scenario 1 is working fine

Sending Message 1 and 2 -> Input Channel (input1) -> Aggregator 1 -> Output Channel (output1)

Scenario 2 is working fine

Sending Message 1 and 2 -> Input Channel (input2) -> Aggregator 2 -> Output Channel (output2)

Scenario 3 is working fine

Sending Message 1 only -> Input Channel (input1) -> Aggregator 1 -> Output Channel (output1)

Scenario 4 is failing, since instead of sending expired message to output2, it is sending to output1

Sending Message 1 only -> Input Channel (input2) -> Aggregator 2 -> Output Channel (output1)

Can anyone suggest why scenario 4 is failing?

Following is my configuration

<int:service-activator ref="activator" method="output1_activator" input-channel="output1" /> 

<int:service-activator ref="activator" method="output2_activator" input-channel="output2" /> 

<int:aggregator input-channel="input1" 
    output-channel="output1" 
    ref="waiter" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    message-store="myJdbcMessageStore" /> 

<int:aggregator input-channel="input2" 
    output-channel="output2" 
    ref="waiter" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    message-store="myJdbcMessageStore" />

<bean id="aggregatorJdbcDataSource" class="o.s.j.d.DriverManagerDataSource"> ..... </bean> 

<bean id="myJdbcMessageStore" class="org.springframework.integration.jdbc.JdbcMessageStore"> 
    <constructor-arg index="0" ref="aggregatorJdbcDataSource" /> 
</bean> 

<bean id="telMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">     <property name="messageGroupStore" ref="myJdbcMessageStore" /> 
    <property name="timeout" value="10000" /> 
</bean> 

<task:scheduled-tasks> 
    <task:scheduled ref="telMessageStoreReaper" method="run" fixed-rate="5000" /> 
</task:scheduled-tasks>

Solution

  • You can't use the same message store instance for both aggregators. The reaper doesn't know which aggregator owns the group.

    You can use the same tables, but you need separate message store instances; see partitioning a message store.

    You nee two stores, each with different regions.

    You could also consider using group-timeout instead of the reaper.

    Please move your configuration from comments to the main question so it's easier for others to read.