I am trying to understand the Aggregator basics. Below is the use case I am trying to implement:
1) Read message (order details) from queue.
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
<orderItem>
<isbn>12333454443</isbn>
<quantity>4</quantity>
</orderItem>
<orderItem>
<isbn>545656777</isbn>
<quantity>50</quantity>
</orderItem>
..
..
</order>
One order message will contain multiple orderItem. And we can expect hundreds of order messages in the queue.
2) End Result ::
a) Each orderitem should be written to a file.
b) 4 such files should be written to a unique folder.
To give an example, lets say we got two order messages - each containing three orderitem.
So we need to create 2 folders :
In "folder 1", there should be 4 files(1 orderitem in each file)
In "folder 2", there should be 2 files(1 orderitem in each file). Here for simplicity we assume no more order messages came and we can write after 5 mins.
Implementation:
I unable to get the aggregator to work as per my understanding.
Need pointers on how to implement this use case where I can group them in 4 and write to unique folders. Please note that the orderitem are all independent book orders and have no relation amongst them.
Below is the configuration.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
<int:channel id="mqInbound"/>
<int:channel id="item"/>
<int:channel id="itemList"/>
<int:channel id="aggregatorDiscardChannel"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="mqInbound"
destination="requestQueue"
message- converter="orderMessageConverter"/>
<int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/>
<int:chain id="aggregateList" input-channel="item" output-channel="itemList" >
<int:header-enricher>
<int:header name="sequenceSize" expression="4" overwrite="true"/>
</int:header-enricher>
<int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" />
</int:chain>
<int:service-activator input-channel="itemList" ref="displayAggregatedList" method="display"/>
<int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>
<bean id="orderAggregator" class="com.samples.Aggregator.OrderAggregator"/>
<bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
...
....
</beans>
public class OrderAggregator {
@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {
return orderItemTypeList;
}
@CorrelationStrategy
public String groupOrders( OrderItemType orderItemType) {
return "items";
}
}
public class DisplayAggregatedList {
public void display(List <OrderItemType> orderItemTypeList) {
System.out.println("######## Display Aggregated ##############");
for(OrderItemType oit : orderItemTypeList) {
System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
}
}
public void displayDiscarded(Message<?> message) {
System.out.println("######## Display Discarded ##############" + message);
}
}
What you need is called expire-groups-upon-completion
:
When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior is to send messages with the same correlation as a completed group to the discard-channel.
If you need to release uncompleted groups anyway (2 orders left, for example), consider to use group-timeout
: http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to