spring-integrationaggregatoraggregators

Unable to get Aggregator to work


I am trying to understand the Aggregator basics. Below is the use case I am trying to implement:

1) Read message (order details) from queue.

<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
  <orderItem>
    <isbn>12333454443</isbn>
    <quantity>4</quantity>
  </orderItem>
  <orderItem>
    <isbn>545656777</isbn>
    <quantity>50</quantity>
  </orderItem>
..
..
</order>

One order message will contain multiple orderItem. And we can expect hundreds of order messages in the queue.

2) End Result ::

a) Each orderitem should be written to a file.

b) 4 such files should be written to a unique folder.

To give an example, lets say we got two order messages - each containing three orderitem.

So we need to create 2 folders :

In "folder 1", there should be 4 files(1 orderitem in each file)

In "folder 2", there should be 2 files(1 orderitem in each file). Here for simplicity we assume no more order messages came and we can write after 5 mins.

Implementation:


  1. I am able to read the message from the queue (websphere MQ) and unmarshall the message successfully.
  2. Used splitter to split the message based on orderitem count.
  3. Used Aggregator to group the message in size of 4.

I unable to get the aggregator to work as per my understanding.

  1. I push one order when 4 orderitem, the message is getting aggregated correctly.
  2. I push one order with 5 orderitem, the first 4 is getting aggregated but the last one is sent to discard channel. This is expected as the MessageGroup is released so the last message is discarded.
  3. I push two orders each containing 2 orderitem. The last 2 orderitem are sent to discard channel.
    The correlation strategy is hardcoded (OrderAggregator.java) but the above case should have worked.

Need pointers on how to implement this use case where I can group them in 4 and write to unique folders. Please note that the orderitem are all independent book orders and have no relation amongst them.

Below is the configuration.

spring-bean.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
  <int:channel id="mqInbound"/>
  <int:channel id="item"/>
  <int:channel id="itemList"/>
  <int:channel id="aggregatorDiscardChannel"/>

  <int-jms:message-driven-channel-adapter id="jmsIn"
                                      channel="mqInbound"
                                      destination="requestQueue" 
                                      message-   converter="orderMessageConverter"/>

  <int:splitter input-channel="mqInbound"  output-channel="item" expression="payload.orderItem"/>

  <int:chain id="aggregateList" input-channel="item" output-channel="itemList"  >
    <int:header-enricher>
      <int:header name="sequenceSize" expression="4" overwrite="true"/>
    </int:header-enricher>
    <int:aggregator  correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders"  discard-channel="aggregatorDiscardChannel" />
  </int:chain>

  <int:service-activator input-channel="itemList"                 ref="displayAggregatedList" method="display"/>
  <int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>

  <bean id="orderAggregator"       class="com.samples.Aggregator.OrderAggregator"/>
  <bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
  ...
  ....
</beans>

OrderAggregator.java

public class OrderAggregator {

@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {

    return orderItemTypeList;
}

@CorrelationStrategy
public String groupOrders( OrderItemType orderItemType) {

    return "items";
}

}

DisplayAggregatedList.java

public class DisplayAggregatedList {

public void display(List <OrderItemType> orderItemTypeList) {

    System.out.println("######## Display Aggregated ##############");
    for(OrderItemType oit : orderItemTypeList) {
        System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
    }
}
public void displayDiscarded(Message<?> message) {

    System.out.println("######## Display Discarded ##############" + message);
}
}   

Solution

  • What you need is called expire-groups-upon-completion:

    When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior is to send messages with the same correlation as a completed group to the discard-channel.

    If you need to release uncompleted groups anyway (2 orders left, for example), consider to use group-timeout: http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to