muleaggregator

How to create a custom aggregator in Mule?


What is the recommended way to create a completely custom aggregator in mule 3.x? By completely custom, I mean according to my own logic, not using correlation IDs, message counts, etc.

The documentation on the mulesoft site is outdated, saying to use AbstractEventAggregator which does not exist in 3.x:

http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio

Digging deeper, it looks like this class has been renamed to AbstractAggregator in 3.x:

http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/routing/AbstractAggregator.html

However, there are no examples that show how to use this. The LoanBroker example described in the first link above actually uses a correlation aggregator (in the 2.x examples, which I assume is what the document is referring to).

At one point, there was an abstract class that had abstract methods shouldAggregate and doAggregate. This is the kind of class I would like to extend.


Solution

  • Look at TestAggregator below for an example of subclassing AbstractAggregator.

    import org.mule.DefaultMuleEvent;
    import org.mule.DefaultMuleMessage;
    import org.mule.api.MuleContext;
    import org.mule.api.MuleEvent;
    import org.mule.api.store.ObjectStoreException;
    import org.mule.api.transformer.TransformerException;
    import org.mule.routing.AbstractAggregator;
    import org.mule.routing.AggregationException;
    import org.mule.routing.EventGroup;
    import org.mule.routing.correlation.CollectionCorrelatorCallback;
    import org.mule.routing.correlation.EventCorrelatorCallback;
    import org.mule.util.concurrent.ThreadNameHelper;
    
    import java.util.Iterator;
    
    public class TestAggregator extends AbstractAggregator
    {
        @Override
        protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
        {
            return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
            {
                @Override
                public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
                {
                    StringBuffer buffer = new StringBuffer(128);
    
                    try
                    {
                        for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
                        {
                            MuleEvent event = iterator.next();
                            try
                            {
                                buffer.append(event.transformMessageToString());
                            }
                            catch (TransformerException e)
                            {
                                throw new AggregationException(events, null, e);
                            }
                        }
                    }
                    catch (ObjectStoreException e)
                    {
                        throw new AggregationException(events,null,e);
                    }
    
                    logger.debug("event payload is: " + buffer.toString());
                    return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
                }
            };
        }
    }