springspring-integrationspring-batchspring-batch-admin

Spring Aggregator Aggregate groups of input


I want to aggregate data into groups, how can I do that? I would like to use the aggregator (Spring Integration) for this. My scenario looks like this.

Spring Batch<-->Spring Integration File -->Reader-->Proccessor<-->[Gateway<->Aggregator]

{Male,Joe;Male,Dave;Female,Anne;Female,Jane}-->sequentialyto-->Gateway-->Aggregate-> Gender(Male,{Joe,Dave}...Gender{Female,{Anne,Jane})

what does the release strategy look like? I Need small Code snippets

Thanks


Solution

  • I'm not sure what is Reader in your case, but Spring Integration provides a FileSplitter tool. Its markers you can use to determine that aggregator is ready to release.

    I would say that you need to have a releaseStrategy as false:

    release-strategy-expression="false"
    

    That means your groups won't be released under the normal conditions. You can't determine the size of the group to be ready to release.

    The FileSplitter.FileMarker.Mark.END emitted as the last message from the FileSplitter after reading the last line of the file together with the MessageGroupStore.expireMessageGroups(0) call would make that aggregator working well. However need still to configure:

     <xsd:attribute name="send-partial-result-on-expiry" type="xsd:string">
                    <xsd:annotation>
                        <xsd:documentation>
                        Specifies whether messages that expired should be aggregated and sent to the 'output-channel' or 'replyChannel'.
                        Messages are expired when their containing MessageGroup expires. One of the ways of expiring MessageGroups is by
                        configuring a MessageGroupStoreReaper. However MessageGroups can alternatively be expired by simply calling
                        MessageGroupStore.expireMessageGroups(timeout). That could be accomplished via a ControlBus operation
                        or by simply invoking that method if you have a reference to the MessageGroupStore instance.
                        </xsd:documentation>
                    </xsd:annotation>
                </xsd:attribute>
    

    To release groups normally after that expiration.