javaapache-camelaggregator

Camel Aggregator and completionPredicate by example


I have the following Camel route:

<route id="myRoute">
    <from uri="direct:aggregator" />

    <aggregate strategy="aggregatorStrategy" completionInterval="60000" completionSize="500">
        <correlationExpression>
            <xpath>/fizz/buzz</xpath>
        </correlationExpression>

        <to uri="bean:postProcessor?method=run" />
    </aggregator>
</route>

As you can see it aggregates either the first 500 messages that the <aggregator/> receives, or all the messages within a 1 minute interval, and then sends the aggregated message on to a bean called postProcessor.

You can think of this aggregation logic as follows:

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed

THEN:
    Send to postProcessor

Or in pseudo-code: aggregateUntil(weHave500Message() || 1minHasElapsed()). I'd like to change this logic to:

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed
    OR
    A message is received that has a property called "fireNow" and a value of "true"

THEN:
    Send to postProcessor

Or, again in pseudo-code: aggregateUntil(weHave500Message() || 1minHasElapsed() || messageHasProperty("fireNow", "true")).

In other words, aggregate until either of the 3 conditions are met. Any ideas how I could implement this? I have a feeling I can finagle this with completionPredicate and perhaps eagerCheckCompletion, but not seeing the forest through the trees here.


Solution

  • You can use completionSize and completionInterval together and add a completionPredicate to the equation, like this (you do need to use eagerCheckCompletion="true" because we need to test the incoming message, not the aggregated message):

    <route>
        <from uri="direct:aggregator" />
        <aggregate completionSize="500" completionInterval="60000" eagerCheckCompletion="true">
            <correlationExpression>
                 <xpath>/fizz/buzz</xpath>
            </correlationExpression>
            <completion-predicate>
                 <simple>${property.fireNow} == 'true'</simple>
            </completion-predicate>
            <to uri="bean:postProcessor?method=run" />
        </aggregate>
    </route>
    

    An alternative could be to create a compound predicate that tests all three conditions and use only that predicate as completion-predicate but this solution is inferior because the interval timeout is triggered by a separate thread, whereas using a compound predicate there is no thread to trigger the aggregation on timeout expiration.